You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/08/10 06:05:14 UTC

[flink] branch master updated (d0a5023f989 -> c5b5d436843)

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from d0a5023f989 [FLINK-28632][sql-gateway][hive] Allow to GetColumns/GetPrimaryKeys/GetTableTypes in the HiveServer2 Endpoint
     new 8b25b969d41 [FLINK-28848][table-planner] Introduces LOOKUP join hint to support delayed retry for lookup join (table alias unsupported in hint)
     new fa6d62dd6bb [hotfix][table-planner] Use scala isInstanceOf to check lookup function type instead of one-level parent class compartion in LookupJoinCodeGenerator
     new 3a2fc5ef34f [hotfix][runtime] Do last attempt without successfully canceling the retry timer to prevent unexpected incomplete element during finish phase in AsyncWaitOperator
     new c5b5d436843 [FLINK-28849][table-planner] Fix errors when enable retry on async lookup and add more tests

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../api/operators/async/AsyncWaitOperator.java     |   8 +-
 .../table/planner/hint/FlinkHintStrategies.java    |  90 ++++++
 .../flink/table/planner/hint/FlinkHints.java       |   6 +-
 .../flink/table/planner/hint/JoinStrategy.java     |  11 +-
 .../table/planner/hint/LookupJoinHintOptions.java  | 139 +++++++++
 .../plan/nodes/exec/batch/BatchExecLookupJoin.java |   2 +
 .../nodes/exec/common/CommonExecLookupJoin.java    |  85 +++---
 .../plan/nodes/exec/spec/LookupJoinHintSpec.java   | 239 ++++++++++++++++
 .../nodes/exec/stream/StreamExecLookupJoin.java    |   5 +
 .../planner/plan/optimize/JoinHintResolver.java    |  20 +-
 .../table/planner/plan/utils/LookupJoinUtil.java   | 312 +++++++++++++++------
 .../planner/codegen/LookupJoinCodeGenerator.scala  |   5 +-
 .../physical/common/CommonPhysicalLookupJoin.scala |  11 +-
 .../physical/stream/StreamPhysicalLookupJoin.scala |  15 +-
 .../BatchCommonSubGraphBasedOptimizer.scala        |  14 +-
 .../optimize/CommonSubGraphBasedOptimizer.scala    |  11 +-
 .../stream/StreamPhysicalLookupJoinRule.scala      |  16 +-
 ...rJoinHintWithInvalidPropagationShuttleTest.java | 101 +------
 ...nHintWithInvalidPropagationShuttleTestBase.java | 128 +++++++++
 ...pJoinHintWithInvalidPropagationShuttleTest.java | 149 ++++++++++
 .../factories/TestValuesRuntimeFunctions.java      |  91 ++++++
 .../planner/factories/TestValuesTableFactory.java  |  52 +++-
 .../batch}/BroadcastJoinHintTest.java              |   2 +-
 .../hints => hints/batch}/JoinHintTestBase.java    |   4 +-
 .../batch}/NestLoopJoinHintTest.java               |   2 +-
 .../batch}/ShuffleHashJoinHintTest.java            |   2 +-
 .../batch}/ShuffleMergeJoinHintTest.java           |   2 +-
 .../exec/serde/LookupJoinHintSpecSerdeTest.java    |  51 ++++
 .../nodes/exec/spec/LookupJoinHintSpecTest.java    |  98 +++++++
 .../nodes/exec/stream/LookupJoinJsonPlanTest.java  |  68 ++++-
 .../optimize/ClearQueryBlockAliasResolverTest.java |   2 +-
 .../plan/optimize/JoinHintResolverTest.java        |   2 +-
 ...upJoinHintWithInvalidPropagationShuttleTest.xml | 127 +++++++++
 .../batch}/BroadcastJoinHintTest.xml               |   0
 .../hints => hints/batch}/NestLoopJoinHintTest.xml |   0
 .../batch}/ShuffleHashJoinHintTest.xml             |   0
 .../batch}/ShuffleMergeJoinHintTest.xml            |   0
 .../testJoinTemporalTable.out                      |   2 +-
 ....out => testJoinTemporalTableWithAsyncHint.out} |  10 +-
 ...out => testJoinTemporalTableWithAsyncHint2.out} |  10 +-
 ...=> testJoinTemporalTableWithAsyncRetryHint.out} |  14 +-
 ...> testJoinTemporalTableWithAsyncRetryHint2.out} |  14 +-
 ...testJoinTemporalTableWithProjectionPushDown.out |   2 +-
 ....out => testJoinTemporalTableWithRetryHint.out} |  13 +-
 .../plan/stream/sql/join/LookupJoinTest.xml        | 226 +++++++++++++--
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |   4 +-
 .../plan/stream/sql/join/LookupJoinTest.scala      | 218 +++++++++++++-
 .../runtime/stream/sql/AsyncLookupJoinITCase.scala | 103 ++++++-
 .../runtime/stream/sql/LookupJoinITCase.scala      | 105 ++++++-
 .../operators/join/lookup/ResultRetryStrategy.java |  69 +++++
 .../lookup/RetryableLookupFunctionDelegator.java   |  83 ++++++
 .../join/RetryableLookupFunctionDelegatorTest.java | 103 +++++++
 52 files changed, 2541 insertions(+), 305 deletions(-)
 create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/LookupJoinHintOptions.java
 create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/LookupJoinHintSpec.java
 create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTestBase.java
 create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.java
 rename flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/{batch/sql/join/hints => hints/batch}/BroadcastJoinHintTest.java (94%)
 rename flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/{batch/sql/join/hints => hints/batch}/JoinHintTestBase.java (99%)
 rename flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/{batch/sql/join/hints => hints/batch}/NestLoopJoinHintTest.java (94%)
 rename flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/{batch/sql/join/hints => hints/batch}/ShuffleHashJoinHintTest.java (94%)
 rename flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/{batch/sql/join/hints => hints/batch}/ShuffleMergeJoinHintTest.java (94%)
 create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LookupJoinHintSpecSerdeTest.java
 create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/spec/LookupJoinHintSpecTest.java
 create mode 100644 flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.xml
 rename flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/{batch/sql/join/hints => hints/batch}/BroadcastJoinHintTest.xml (100%)
 rename flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/{batch/sql/join/hints => hints/batch}/NestLoopJoinHintTest.xml (100%)
 rename flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/{batch/sql/join/hints => hints/batch}/ShuffleHashJoinHintTest.xml (100%)
 rename flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/{batch/sql/join/hints => hints/batch}/ShuffleMergeJoinHintTest.xml (100%)
 copy flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/{testJoinTemporalTable.out => testJoinTemporalTableWithAsyncHint.out} (98%)
 copy flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/{testJoinTemporalTable.out => testJoinTemporalTableWithAsyncHint2.out} (98%)
 copy flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/{testJoinTemporalTable.out => testJoinTemporalTableWithAsyncRetryHint.out} (97%)
 copy flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/{testJoinTemporalTable.out => testJoinTemporalTableWithAsyncRetryHint2.out} (97%)
 copy flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/{testJoinTemporalTable.out => testJoinTemporalTableWithRetryHint.out} (97%)
 create mode 100644 flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/ResultRetryStrategy.java
 create mode 100644 flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/RetryableLookupFunctionDelegator.java
 create mode 100644 flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RetryableLookupFunctionDelegatorTest.java


[flink] 04/04: [FLINK-28849][table-planner] Fix errors when enable retry on async lookup and add more tests

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c5b5d4368437fc98b2f5ca31b1f1c6cf3e4ce263
Author: lincoln lee <li...@gmail.com>
AuthorDate: Tue Aug 9 18:14:08 2022 +0800

    [FLINK-28849][table-planner] Fix errors when enable retry on async lookup and add more tests
    
    Disable retry on async because of two problems need to be resolved first
    
    This closes #20482
---
 .../nodes/exec/common/CommonExecLookupJoin.java    |  35 +++----
 .../table/planner/plan/utils/LookupJoinUtil.java   |  13 ++-
 .../physical/common/CommonPhysicalLookupJoin.scala |   7 +-
 .../factories/TestValuesRuntimeFunctions.java      |  91 ++++++++++++++++++
 .../planner/factories/TestValuesTableFactory.java  |  52 ++++++++--
 .../plan/stream/sql/join/LookupJoinTest.xml        |   2 +-
 .../runtime/stream/sql/AsyncLookupJoinITCase.scala | 103 +++++++++++++++++++-
 .../runtime/stream/sql/LookupJoinITCase.scala      | 105 ++++++++++++++++++++-
 8 files changed, 367 insertions(+), 41 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
index 52192ee79ad..d00c888e1fa 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
@@ -542,30 +542,17 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
                             isLeftOuterJoin,
                             asyncLookupOptions.asyncBufferCapacity);
         }
-        /**
-         * why not implements async-retry directly in AsyncLookupFunction ? - because the active
-         * sleeping on async callback thread will occupy the task cpu time while the retry support
-         * in async data stream api provides a more efficient way via processing time service which
-         * does not occupy callback thread. Both AsyncLookupFunction AsyncTableFunction can support
-         * retry. does not occupy callback thread. Both AsyncLookupFunction AsyncTableFunction can
-         * support retry.
-         */
-        if (null != joinHintSpec) {
-            // simplify code here, not check whether ResultRetryStrategy is NO_RETRY_STRATEGY or not
-            // because AsyncWaitOperator has short-path optimization during compile time.
-            return new AsyncWaitOperatorFactory<>(
-                    asyncFunc,
-                    asyncLookupOptions.asyncTimeout,
-                    asyncLookupOptions.asyncBufferCapacity,
-                    convert(asyncLookupOptions.asyncOutputMode),
-                    joinHintSpec.toRetryStrategy());
-        } else {
-            return new AsyncWaitOperatorFactory<>(
-                    asyncFunc,
-                    asyncLookupOptions.asyncTimeout,
-                    asyncLookupOptions.asyncBufferCapacity,
-                    convert(asyncLookupOptions.asyncOutputMode));
-        }
+        // TODO async retry to be supported, can not directly enable retry on 'AsyncWaitOperator'
+        // because of two reasons: 1. AsyncLookupJoinRunner has a 'stateful' resultFutureBuffer bind
+        // to each input record (it's non-reenter-able) 2. can not lookup new value if cache empty
+        // enabled when chained with the new AsyncCachingLookupFunction. This two issues should be
+        // resolved first before enable async retry.
+
+        return new AsyncWaitOperatorFactory<>(
+                asyncFunc,
+                asyncLookupOptions.asyncTimeout,
+                asyncLookupOptions.asyncBufferCapacity,
+                convert(asyncLookupOptions.asyncOutputMode));
     }
 
     private AsyncDataStream.OutputMode convert(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
index 833eb1908a7..529f79b706e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
@@ -400,18 +400,23 @@ public final class LookupJoinUtil {
                 || lookupJoinHintSpec.isAsync();
     }
 
-    public static boolean isAsyncLookup(RelOptTable temporalTable, Collection<Integer> lookupKeys) {
+    public static boolean isAsyncLookup(
+            RelOptTable temporalTable,
+            Collection<Integer> lookupKeys,
+            LookupJoinHintSpec lookupJoinHintSpec) {
+        boolean preferAsync = preferAsync(lookupJoinHintSpec);
         if (temporalTable instanceof TableSourceTable) {
             LookupTableSource.LookupRuntimeProvider provider =
                     getLookupRuntimeProvider(temporalTable, lookupKeys);
-            return provider instanceof AsyncLookupFunctionProvider
-                    || provider instanceof AsyncTableFunctionProvider;
+            return preferAsync
+                    && (provider instanceof AsyncLookupFunctionProvider
+                            || provider instanceof AsyncTableFunctionProvider);
         } else if (temporalTable instanceof LegacyTableSourceTable) {
             LegacyTableSourceTable<?> legacyTableSourceTable =
                     (LegacyTableSourceTable<?>) temporalTable;
             LookupableTableSource<?> lookupableTableSource =
                     (LookupableTableSource<?>) legacyTableSourceTable.tableSource();
-            return lookupableTableSource.isAsyncEnabled();
+            return preferAsync && lookupableTableSource.isAsyncEnabled();
         }
         throw new TableException(
                 String.format(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
index 568909ddd5b..c5fde386f73 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
@@ -161,8 +161,13 @@ abstract class CommonPhysicalLookupJoin(
       case t: LegacyTableSourceTable[_] => t.tableIdentifier
     }
 
+    // The lookup function maybe not the final choice at runtime because lack of upsert materialize
+    // info here. This can be consistent after planner offers enough info here.
     val isAsyncEnabled: Boolean =
-      LookupJoinUtil.isAsyncLookup(temporalTable, allLookupKeys.keys.map(Int.box).toList.asJava)
+      LookupJoinUtil.isAsyncLookup(
+        temporalTable,
+        allLookupKeys.keys.map(Int.box).toList.asJava,
+        lookupHintSpec.orNull)
 
     super
       .explainTerms(pw)
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
index b915529ef62..510efcd09ab 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
@@ -707,4 +707,95 @@ final class TestValuesRuntimeFunctions {
                     });
         }
     }
+
+    /**
+     * The {@link TestNoLookupUntilNthAccessLookupFunction} extends {@link
+     * TestValuesLookupFunction}, it will not do real lookup for a key (return null value
+     * immediately) until which lookup times beyond predefined threshold 'lookupThreshold'.
+     */
+    public static class TestNoLookupUntilNthAccessLookupFunction extends TestValuesLookupFunction {
+
+        private static final long serialVersionUID = 1L;
+
+        /** The threshold that a real lookup can happen, otherwise no lookup at all. */
+        private final int lookupThreshold;
+
+        private transient Map<RowData, Integer> accessCounter;
+
+        protected TestNoLookupUntilNthAccessLookupFunction(
+                List<Row> data,
+                int[] lookupIndices,
+                LookupTableSource.DataStructureConverter converter,
+                int lookupThreshold) {
+            super(data, lookupIndices, converter);
+            this.lookupThreshold = lookupThreshold;
+        }
+
+        @Override
+        public void open(FunctionContext context) throws Exception {
+            super.open(context);
+            accessCounter = new HashMap<>();
+        }
+
+        protected int counter(RowData key) {
+            int currentCnt = accessCounter.computeIfAbsent(key, cnt -> 0) + 1;
+            accessCounter.put(key, currentCnt);
+            return currentCnt;
+        }
+
+        @Override
+        public Collection<RowData> lookup(RowData keyRow) throws IOException {
+            int currentCnt = counter(keyRow);
+            if (currentCnt <= lookupThreshold) {
+                return null;
+            }
+            return super.lookup(keyRow);
+        }
+    }
+
+    /**
+     * The {@link TestNoLookupUntilNthAccessAsyncLookupFunction} extends {@link
+     * AsyncTestValueLookupFunction}, it will not do real lookup for a key (return empty result
+     * immediately) until which lookup times beyond predefined threshold 'lookupThreshold'.
+     */
+    public static class TestNoLookupUntilNthAccessAsyncLookupFunction
+            extends AsyncTestValueLookupFunction {
+        private static final long serialVersionUID = 1L;
+        private static Collection<RowData> emptyResult = Collections.emptyList();
+
+        /** The threshold that a real lookup can happen, otherwise no lookup at all. */
+        private final int lookupThreshold;
+
+        private transient Map<RowData, Integer> accessCounter;
+
+        public TestNoLookupUntilNthAccessAsyncLookupFunction(
+                List<Row> data,
+                int[] lookupIndices,
+                LookupTableSource.DataStructureConverter converter,
+                int lookupThreshold) {
+            super(data, lookupIndices, converter);
+            this.lookupThreshold = lookupThreshold;
+        }
+
+        @Override
+        public void open(FunctionContext context) throws Exception {
+            super.open(context);
+            accessCounter = new HashMap<>();
+        }
+
+        protected int counter(RowData key) {
+            int currentCnt = accessCounter.computeIfAbsent(key, cnt -> 0) + 1;
+            accessCounter.put(key, currentCnt);
+            return currentCnt;
+        }
+
+        @Override
+        public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
+            int currentCnt = counter(keyRow);
+            if (currentCnt <= lookupThreshold) {
+                return CompletableFuture.supplyAsync(() -> emptyResult);
+            }
+            return super.asyncLookup(keyRow);
+        }
+    }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 60967f8494d..949b9f57e57 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -86,14 +86,18 @@ import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.functions.AsyncLookupFunction;
 import org.apache.flink.table.functions.AsyncTableFunction;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.LookupFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.AppendingOutputFormat;
 import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.AppendingSinkFunction;
 import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.AsyncTestValueLookupFunction;
 import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.KeyedUpsertingSinkFunction;
 import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.RetractingSinkFunction;
+import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.TestNoLookupUntilNthAccessAsyncLookupFunction;
+import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.TestNoLookupUntilNthAccessLookupFunction;
 import org.apache.flink.table.planner.factories.TestValuesRuntimeFunctions.TestValuesLookupFunction;
 import org.apache.flink.table.planner.functions.aggfunctions.Count1AggFunction;
 import org.apache.flink.table.planner.functions.aggfunctions.CountAggFunction;
@@ -317,6 +321,13 @@ public final class TestValuesTableFactory
     private static final ConfigOption<String> LOOKUP_FUNCTION_CLASS =
             ConfigOptions.key("lookup-function-class").stringType().noDefaultValue();
 
+    private static final ConfigOption<Integer> LOOKUP_THRESHOLD =
+            ConfigOptions.key("start-lookup-threshold")
+                    .intType()
+                    .defaultValue(-1)
+                    .withDescription(
+                            "The threshold which backend lookup function will not do real lookup for"
+                                    + " a key (returns null value immediately) until its lookup times beyond");
     private static final ConfigOption<Boolean> ASYNC_ENABLED =
             ConfigOptions.key("async").booleanType().defaultValue(false);
 
@@ -416,6 +427,7 @@ public final class TestValuesTableFactory
         boolean failingSource = helper.getOptions().get(FAILING_SOURCE);
         int numElementToSkip = helper.getOptions().get(SOURCE_NUM_ELEMENT_TO_SKIP);
         boolean internalData = helper.getOptions().get(INTERNAL_DATA);
+        int lookupThreshold = helper.getOptions().get(LOOKUP_THRESHOLD);
         DefaultLookupCache cache = null;
         if (helper.getOptions().get(CACHE_TYPE).equals(LookupOptions.LookupCacheType.PARTIAL)) {
             cache = DefaultLookupCache.fromConfig(helper.getOptions());
@@ -540,7 +552,8 @@ public final class TestValuesTableFactory
                         readableMetadata,
                         null,
                         cache,
-                        reloadTrigger);
+                        reloadTrigger,
+                        lookupThreshold);
             }
         } else {
             try {
@@ -622,6 +635,7 @@ public final class TestValuesTableFactory
                         TABLE_SOURCE_CLASS,
                         FAILING_SOURCE,
                         LOOKUP_FUNCTION_CLASS,
+                        LOOKUP_THRESHOLD,
                         ASYNC_ENABLED,
                         DISABLE_LOOKUP,
                         TABLE_SOURCE_CLASS,
@@ -1496,6 +1510,7 @@ public final class TestValuesTableFactory
         private final @Nullable LookupCache cache;
         private final @Nullable CacheReloadTrigger reloadTrigger;
         private final boolean isAsync;
+        private final int lookupThreshold;
 
         private TestValuesScanLookupTableSource(
                 DataType producedDataType,
@@ -1517,7 +1532,8 @@ public final class TestValuesTableFactory
                 Map<String, DataType> readableMetadata,
                 @Nullable int[] projectedMetadataFields,
                 @Nullable LookupCache cache,
-                @Nullable CacheReloadTrigger reloadTrigger) {
+                @Nullable CacheReloadTrigger reloadTrigger,
+                int lookupThreshold) {
             super(
                     producedDataType,
                     changelogMode,
@@ -1539,6 +1555,7 @@ public final class TestValuesTableFactory
             this.isAsync = isAsync;
             this.cache = cache;
             this.reloadTrigger = reloadTrigger;
+            this.lookupThreshold = lookupThreshold;
         }
 
         @SuppressWarnings({"unchecked", "rawtypes"})
@@ -1549,7 +1566,11 @@ public final class TestValuesTableFactory
                 try {
                     Class<?> clazz = Class.forName(lookupFunctionClass);
                     Object udtf = InstantiationUtil.instantiate(clazz);
-                    if (udtf instanceof TableFunction) {
+                    if (udtf instanceof LookupFunction) {
+                        return LookupFunctionProvider.of((LookupFunction) udtf);
+                    } else if (udtf instanceof AsyncLookupFunction) {
+                        return AsyncLookupFunctionProvider.of((AsyncLookupFunction) udtf);
+                    } else if (udtf instanceof TableFunction) {
                         return TableFunctionProvider.of((TableFunction) udtf);
                     } else {
                         return AsyncTableFunctionProvider.of((AsyncTableFunction) udtf);
@@ -1582,7 +1603,7 @@ public final class TestValuesTableFactory
                     context.createDataStructureConverter(producedDataType);
             if (isAsync) {
                 AsyncTestValueLookupFunction asyncLookupFunction =
-                        new AsyncTestValueLookupFunction(data, lookupIndices, converter);
+                        getTestValuesAsyncLookupFunction(data, lookupIndices, converter);
                 if (cache == null) {
                     return AsyncLookupFunctionProvider.of(asyncLookupFunction);
                 } else {
@@ -1591,7 +1612,7 @@ public final class TestValuesTableFactory
                 }
             } else {
                 TestValuesLookupFunction lookupFunction =
-                        new TestValuesLookupFunction(data, lookupIndices, converter);
+                        getTestValuesLookupFunction(data, lookupIndices, converter);
                 if (cache != null) {
                     return PartialCachingLookupProvider.of(lookupFunction, cache);
                 } else if (reloadTrigger != null) {
@@ -1608,6 +1629,24 @@ public final class TestValuesTableFactory
             }
         }
 
+        private AsyncTestValueLookupFunction getTestValuesAsyncLookupFunction(
+                List<Row> data, int[] lookupIndices, DataStructureConverter converter) {
+            if (lookupThreshold > 0) {
+                return new TestNoLookupUntilNthAccessAsyncLookupFunction(
+                        data, lookupIndices, converter, lookupThreshold);
+            }
+            return new AsyncTestValueLookupFunction(data, lookupIndices, converter);
+        }
+
+        private TestValuesLookupFunction getTestValuesLookupFunction(
+                List<Row> data, int[] lookupIndices, DataStructureConverter converter) {
+            if (lookupThreshold > 0) {
+                return new TestNoLookupUntilNthAccessLookupFunction(
+                        data, lookupIndices, converter, lookupThreshold);
+            }
+            return new TestValuesLookupFunction(data, lookupIndices, converter);
+        }
+
         @Override
         public DynamicTableSource copy() {
             return new TestValuesScanLookupTableSource(
@@ -1630,7 +1669,8 @@ public final class TestValuesTableFactory
                     readableMetadata,
                     projectedMetadataFields,
                     cache,
-                    reloadTrigger);
+                    reloadTrigger,
+                    lookupThreshold);
         }
     }
 
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
index e1de434979f..4f6a308b9b5 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
@@ -193,7 +193,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
-+- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
index 8826a2ccf46..cb66ed0aff7 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
@@ -76,6 +76,10 @@ class AsyncLookupJoinITCase(
 
     createScanTable("src", data)
     createLookupTable("user_table", userData)
+    // lookup will start from the 2nd time, first lookup will always get null result
+    createLookupTable("user_table_with_lookup_threshold2", userData, 2)
+    // lookup will start from the 3rd time, first lookup will always get null result
+    createLookupTable("user_table_with_lookup_threshold3", userData, 3)
   }
 
   @After
@@ -88,7 +92,10 @@ class AsyncLookupJoinITCase(
     }
   }
 
-  private def createLookupTable(tableName: String, data: List[Row]): Unit = {
+  private def createLookupTable(
+      tableName: String,
+      data: List[Row],
+      lookupThreshold: Int = -1): Unit = {
     if (legacyTableSource) {
       val userSchema = TableSchema
         .builder()
@@ -111,6 +118,10 @@ class AsyncLookupJoinITCase(
              |  '${LookupOptions.PARTIAL_CACHE_MAX_ROWS.key()}' = '${Long.MaxValue}',
              |""".stripMargin
         else ""
+      val lookupThresholdOption = if (lookupThreshold > 0) {
+        s"'start-lookup-threshold'='$lookupThreshold',"
+      } else ""
+
       tEnv.executeSql(s"""
                          |CREATE TABLE $tableName (
                          |  `age` INT,
@@ -118,6 +129,7 @@ class AsyncLookupJoinITCase(
                          |  `name` STRING
                          |) WITH (
                          |  $cacheOptions
+                         |  $lookupThresholdOption
                          |  'connector' = 'values',
                          |  'data-id' = '$dataId',
                          |  'async' = 'true'
@@ -126,6 +138,19 @@ class AsyncLookupJoinITCase(
     }
   }
 
+  // TODO a base class or utility class is better to reuse code for this and LookupJoinITCase
+  private def getAsyncRetryLookupHint(lookupTable: String, maxAttempts: Int): String = {
+    s"""
+       |/*+ LOOKUP('table'='$lookupTable', 
+       | 'async'='true', 
+       | 'time-out'='300s',
+       | 'retry-predicate'='lookup_miss',
+       | 'retry-strategy'='fixed_delay',
+       | 'fixed-delay'='1 ms',
+       | 'max-attempts'='$maxAttempts')
+       |*/""".stripMargin
+  }
+
   private def createScanTable(tableName: String, data: List[Row]): Unit = {
     val dataId = TestValuesTableFactory.registerData(data)
     tEnv.executeSql(s"""
@@ -289,7 +314,7 @@ class AsyncLookupJoinITCase(
     // only legacy source can provide both sync and async functions
     if (!legacyTableSource) {
       thrown.expectMessage(
-        "Require a synchronous lookup function due to planner's requirement but no available functions")
+        "Required sync lookup function by planner, but table [default_catalog, default_database, user_table]does not offer a valid lookup function")
       thrown.expect(classOf[TableException])
     }
     tEnv.getConfig.set(
@@ -415,7 +440,79 @@ class AsyncLookupJoinITCase(
     new java.lang.Long(l)
   }
 
-// TODO add case with async and retry in FLINK-28849
+  @Test
+  def testAsyncJoinTemporalTableWithRetry(): Unit = {
+    val maxRetryTwiceHint = getAsyncRetryLookupHint("user_table", 2)
+    val sink = new TestingAppendSink
+    tEnv
+      .sqlQuery(s"""
+                   |SELECT $maxRetryTwiceHint T.id, T.len, T.content, D.name FROM src AS T
+                   |JOIN user_table for system_time as of T.proctime AS D
+                   |ON T.id = D.id
+                   |""".stripMargin)
+      .toAppendStream[Row]
+      .addSink(sink)
+    env.execute()
+
+    // the result is deterministic because the test data of lookup source is static
+    val expected = Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testAsyncJoinTemporalTableWithLookupThresholdWithInsufficientRetry(): Unit = {
+    val maxRetryOnceHint = getAsyncRetryLookupHint("user_table_with_lookup_threshold3", 1)
+    val sink = new TestingAppendSink
+    tEnv
+      .sqlQuery(s"""
+                   |SELECT $maxRetryOnceHint T.id, T.len, T.content, D.name FROM src AS T
+                   |JOIN user_table_with_lookup_threshold3 for system_time as of T.proctime AS D
+                   |ON T.id = D.id
+                   |""".stripMargin)
+      .toAppendStream[Row]
+      .addSink(sink)
+    env.execute()
+
+    val expected = if (legacyTableSource) {
+      // test legacy lookup source do not support lookup threshold
+      // for real async lookup functions(both new and legacy api) do support retry
+      Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian")
+    } else {
+      // the user_table_with_lookup_threshold3 will return null result before 3rd lookup
+      Seq()
+    }
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testAsyncJoinTemporalTableWithLookupThresholdWithSufficientRetry(): Unit = {
+    // When enable async retry, there should left enough time for the async operator doing delayed
+    // retry work, but due the fast finish of testing bounded source, it has no assurance of the
+    // max attempts number, it only ensures at least one retry for each element in current version
+    // so we can only use a max lookup threshold to 2 to get a deterministic results
+    val maxRetryTwiceHint = getAsyncRetryLookupHint("user_table_with_lookup_threshold2", 2)
+
+    val sink = new TestingAppendSink
+    tEnv
+      .sqlQuery(s"""
+                   |SELECT $maxRetryTwiceHint T.id, T.len, T.content, D.name FROM src AS T
+                   |JOIN user_table_with_lookup_threshold2 for system_time as of T.proctime AS D
+                   |ON T.id = D.id
+                   |""".stripMargin)
+      .toAppendStream[Row]
+      .addSink(sink)
+    env.execute()
+
+    val expected = if (legacyTableSource) {
+      // test legacy lookup source do not support lookup threshold
+      // for real async lookup functions(both new and legacy api) do support retry
+      Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian")
+    } else {
+      // TODO retry on async is not supported currently, this should be updated after supported
+      Seq()
+    }
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
 
 }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
index c871ab24c34..edea5d48915 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
@@ -81,6 +81,10 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType)
     createScanTable("nullable_src", dataWithNull)
     createLookupTable("user_table", userData)
     createLookupTable("nullable_user_table", userDataWithNull)
+    // lookup will start from the 2nd time, first lookup will always get null result
+    createLookupTable("user_table_with_lookup_threshold2", userData, 2)
+    // lookup will start from the 3rd time, first lookup will always get null result
+    createLookupTable("user_table_with_lookup_threshold3", userData, 3)
     createLookupTableWithComputedColumn("userTableWithComputedColumn", userData)
   }
 
@@ -93,7 +97,11 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType)
     }
   }
 
-  private def createLookupTable(tableName: String, data: List[Row]): Unit = {
+  /** The lookupThreshold only works for new table source (not legacyTableSource). */
+  private def createLookupTable(
+      tableName: String,
+      data: List[Row],
+      lookupThreshold: Int = -1): Unit = {
     if (legacyTableSource) {
       val userSchema = TableSchema
         .builder()
@@ -122,6 +130,9 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType)
              |  '${LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL.key()}' = '${Long.MaxValue}',
              |""".stripMargin
         else ""
+      val lookupThresholdOption = if (lookupThreshold > 0) {
+        s"'start-lookup-threshold'='$lookupThreshold',"
+      } else ""
 
       tEnv.executeSql(s"""
                          |CREATE TABLE $tableName (
@@ -130,6 +141,7 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType)
                          |  `name` STRING
                          |) WITH (
                          |  $cacheOptions
+                         |  $lookupThresholdOption
                          |  'connector' = 'values',
                          |  'data-id' = '$dataId'
                          |)
@@ -708,8 +720,97 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType)
     val expected = Seq("3", "8", "9")
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
-  // TODO add case with retry hint in FLINK-28849
 
+  private def getRetryLookupHint(lookupTable: String, maxAttempts: Int): String = {
+    s"""
+       |/*+ LOOKUP('table'='$lookupTable', 'retry-predicate'='lookup_miss',
+       | 'retry-strategy'='fixed_delay',
+       |  'fixed-delay'='5 ms',
+       |   'max-attempts'='$maxAttempts')
+       |*/""".stripMargin
+  }
+
+  @Test
+  def testJoinTemporalTableWithRetry(): Unit = {
+    val maxRetryTwiceHint = getRetryLookupHint("user_table", 2)
+    val sink = new TestingAppendSink
+    tEnv
+      .sqlQuery(s"""
+                   |SELECT $maxRetryTwiceHint T.id, T.len, T.content, D.name FROM src AS T
+                   |JOIN user_table for system_time as of T.proctime AS D
+                   |ON T.id = D.id
+                   |""".stripMargin)
+      .toAppendStream[Row]
+      .addSink(sink)
+    env.execute()
+
+    // the result is deterministic because the test data of lookup source is static
+    val expected = Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testJoinTemporalTableWithLookupThresholdWithInsufficientRetry(): Unit = {
+    val maxRetryOnceHint = getRetryLookupHint("user_table_with_lookup_threshold3", 1)
+    val sink = new TestingAppendSink
+    tEnv
+      .sqlQuery(s"""
+                   |SELECT $maxRetryOnceHint T.id, T.len, T.content, D.name FROM src AS T
+                   |JOIN user_table_with_lookup_threshold3 for system_time as of T.proctime AS D
+                   |ON T.id = D.id
+                   |""".stripMargin)
+      .toAppendStream[Row]
+      .addSink(sink)
+    env.execute()
+
+    val expected = if (legacyTableSource || cacheType == LookupCacheType.FULL) {
+      // legacy lookup source and full caching lookup do not support retry
+      Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian")
+    } else {
+      // the user_table_with_lookup_threshold3 will return null result before 3rd lookup
+      Seq()
+    }
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testJoinTemporalTableWithLookupThresholdWithSufficientRetry(): Unit = {
+    val maxRetryTwiceHint = getRetryLookupHint("user_table_with_lookup_threshold2", 2)
+
+    val sink = new TestingAppendSink
+    tEnv
+      .sqlQuery(s"""
+                   |SELECT $maxRetryTwiceHint T.id, T.len, T.content, D.name FROM src AS T
+                   |JOIN user_table_with_lookup_threshold2 for system_time as of T.proctime AS D
+                   |ON T.id = D.id
+                   |""".stripMargin)
+      .toAppendStream[Row]
+      .addSink(sink)
+    env.execute()
+
+    val expected = Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testJoinTemporalTableWithLookupThresholdWithLargerRetry(): Unit = {
+    // max times beyond the lookup threshold of 'user_table_with_lookup_threshold2'
+    val largerRetryHint = getRetryLookupHint("user_table_with_lookup_threshold2", 10)
+
+    val sink = new TestingAppendSink
+    tEnv
+      .sqlQuery(s"""
+                   |SELECT $largerRetryHint T.id, T.len, T.content, D.name FROM src AS T
+                   |JOIN user_table_with_lookup_threshold2 for system_time as of T.proctime AS D
+                   |ON T.id = D.id
+                   |""".stripMargin)
+      .toAppendStream[Row]
+      .addSink(sink)
+    env.execute()
+
+    val expected = Seq("1,12,Julian,Julian", "2,15,Hello,Jark", "3,15,Fabian,Fabian")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
 }
 
 object LookupJoinITCase {


[flink] 02/04: [hotfix][table-planner] Use scala isInstanceOf to check lookup function type instead of one-level parent class compartion in LookupJoinCodeGenerator

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fa6d62dd6bbaa3876656a97ff519e37f9a3f0730
Author: lincoln lee <li...@gmail.com>
AuthorDate: Tue Aug 9 16:59:00 2022 +0800

    [hotfix][table-planner] Use scala isInstanceOf to check lookup function type instead of one-level parent class compartion in LookupJoinCodeGenerator
    
    This bug can be reproduced by AsyncLookupJoinITCase#testAsyncJoinTemporalTableWithLookupThresholdWithInsufficientRetry when caching is disabled in FLINK-28849
    
    This closes #20482
---
 .../apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
index bc2fb42b4c4..8f1818af6a5 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
@@ -250,10 +250,7 @@ object LookupJoinCodeGenerator {
         val defaultOutputDataType = callContext.getOutputDataType.get()
 
         val outputClass =
-          if (
-            udf.getClass.getSuperclass == classOf[LookupFunction]
-            || udf.getClass.getSuperclass == classOf[AsyncLookupFunction]
-          ) {
+          if (udf.isInstanceOf[LookupFunction] || udf.isInstanceOf[AsyncLookupFunction]) {
             Some(classOf[RowData])
           } else {
             toScala(extractSimpleGeneric(baseClass, udf.getClass, 0))


[flink] 01/04: [FLINK-28848][table-planner] Introduces LOOKUP join hint to support delayed retry for lookup join (table alias unsupported in hint)

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8b25b969d4168d070e5e78ef07a6a573c9b63d95
Author: lincoln lee <li...@gmail.com>
AuthorDate: Sat Aug 6 23:42:26 2022 +0800

    [FLINK-28848][table-planner] Introduces LOOKUP join hint to support delayed retry for lookup join (table alias unsupported in hint)
    
    This is the main part of FLINK-28779 to implement FLIP-234: Support Retryable Lookup Join To Solve Delayed Updates Issue In External Systems
    
    This closes #20482
---
 .../table/planner/hint/FlinkHintStrategies.java    |  90 +++++++
 .../flink/table/planner/hint/FlinkHints.java       |   6 +-
 .../flink/table/planner/hint/JoinStrategy.java     |  11 +-
 .../table/planner/hint/LookupJoinHintOptions.java  | 139 ++++++++++
 .../plan/nodes/exec/batch/BatchExecLookupJoin.java |   2 +
 .../nodes/exec/common/CommonExecLookupJoin.java    | 102 ++++---
 .../plan/nodes/exec/spec/LookupJoinHintSpec.java   | 239 ++++++++++++++++
 .../nodes/exec/stream/StreamExecLookupJoin.java    |   5 +
 .../planner/plan/optimize/JoinHintResolver.java    |  20 +-
 .../table/planner/plan/utils/LookupJoinUtil.java   | 299 +++++++++++++++------
 .../physical/common/CommonPhysicalLookupJoin.scala |   4 +-
 .../physical/stream/StreamPhysicalLookupJoin.scala |  15 +-
 .../BatchCommonSubGraphBasedOptimizer.scala        |  14 +-
 .../optimize/CommonSubGraphBasedOptimizer.scala    |  11 +-
 .../stream/StreamPhysicalLookupJoinRule.scala      |  16 +-
 ...rJoinHintWithInvalidPropagationShuttleTest.java | 101 +------
 ...nHintWithInvalidPropagationShuttleTestBase.java | 128 +++++++++
 ...pJoinHintWithInvalidPropagationShuttleTest.java | 149 ++++++++++
 .../batch}/BroadcastJoinHintTest.java              |   2 +-
 .../hints => hints/batch}/JoinHintTestBase.java    |   4 +-
 .../batch}/NestLoopJoinHintTest.java               |   2 +-
 .../batch}/ShuffleHashJoinHintTest.java            |   2 +-
 .../batch}/ShuffleMergeJoinHintTest.java           |   2 +-
 .../exec/serde/LookupJoinHintSpecSerdeTest.java    |  51 ++++
 .../nodes/exec/spec/LookupJoinHintSpecTest.java    |  98 +++++++
 .../nodes/exec/stream/LookupJoinJsonPlanTest.java  |  68 ++++-
 .../optimize/ClearQueryBlockAliasResolverTest.java |   2 +-
 .../plan/optimize/JoinHintResolverTest.java        |   2 +-
 ...upJoinHintWithInvalidPropagationShuttleTest.xml | 127 +++++++++
 .../batch}/BroadcastJoinHintTest.xml               |   0
 .../hints => hints/batch}/NestLoopJoinHintTest.xml |   0
 .../batch}/ShuffleHashJoinHintTest.xml             |   0
 .../batch}/ShuffleMergeJoinHintTest.xml            |   0
 .../testJoinTemporalTable.out                      |   2 +-
 ....out => testJoinTemporalTableWithAsyncHint.out} |  10 +-
 ...out => testJoinTemporalTableWithAsyncHint2.out} |  10 +-
 ...=> testJoinTemporalTableWithAsyncRetryHint.out} |  14 +-
 ...> testJoinTemporalTableWithAsyncRetryHint2.out} |  14 +-
 ...testJoinTemporalTableWithProjectionPushDown.out |   2 +-
 ....out => testJoinTemporalTableWithRetryHint.out} |  13 +-
 .../plan/stream/sql/join/LookupJoinTest.xml        | 226 ++++++++++++++--
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |   4 +-
 .../plan/stream/sql/join/LookupJoinTest.scala      | 218 ++++++++++++++-
 .../runtime/stream/sql/AsyncLookupJoinITCase.scala |   2 +
 .../runtime/stream/sql/LookupJoinITCase.scala      |   2 +
 .../operators/join/lookup/ResultRetryStrategy.java |  69 +++++
 .../lookup/RetryableLookupFunctionDelegator.java   |  83 ++++++
 .../join/RetryableLookupFunctionDelegatorTest.java | 103 +++++++
 48 files changed, 2198 insertions(+), 285 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
index c3fea355336..8a2b285d1e8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
@@ -18,14 +18,19 @@
 
 package org.apache.flink.table.planner.hint;
 
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.planner.plan.rules.logical.WrapJsonAggFunctionArgumentsRule;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
 import org.apache.calcite.rel.hint.HintOptionChecker;
 import org.apache.calcite.rel.hint.HintPredicates;
 import org.apache.calcite.rel.hint.HintStrategy;
 import org.apache.calcite.rel.hint.HintStrategyTable;
 import org.apache.calcite.util.Litmus;
 
+import java.time.Duration;
 import java.util.Collections;
 
 /** A collection of Flink style {@link HintStrategy}s. */
@@ -82,6 +87,13 @@ public abstract class FlinkHintStrategies {
                         HintStrategy.builder(HintPredicates.JOIN)
                                 .optionChecker(NON_EMPTY_LIST_OPTION_CHECKER)
                                 .build())
+                .hintStrategy(
+                        JoinStrategy.LOOKUP.getJoinHintName(),
+                        HintStrategy.builder(
+                                        HintPredicates.or(
+                                                HintPredicates.CORRELATE, HintPredicates.JOIN))
+                                .optionChecker(LOOKUP_NON_EMPTY_KV_OPTION_CHECKER)
+                                .build())
                 .build();
     }
 
@@ -107,4 +119,82 @@ public abstract class FlinkHintStrategies {
                                     + "one table or view specified in hint {}.",
                             FlinkHints.stringifyHints(Collections.singletonList(hint)),
                             hint.hintName);
+
+    private static final HintOptionChecker LOOKUP_NON_EMPTY_KV_OPTION_CHECKER =
+            (lookupHint, litmus) -> {
+                litmus.check(
+                        lookupHint.listOptions.size() == 0,
+                        "Invalid list options in LOOKUP hint, only support key-value options.");
+
+                Configuration conf = Configuration.fromMap(lookupHint.kvOptions);
+                ImmutableSet<ConfigOption> requiredKeys =
+                        LookupJoinHintOptions.getRequiredOptions();
+                litmus.check(
+                        requiredKeys.stream().allMatch(conf::contains),
+                        "Invalid LOOKUP hint: incomplete required option(s): {}",
+                        requiredKeys);
+
+                ImmutableSet<ConfigOption> supportedKeys =
+                        LookupJoinHintOptions.getSupportedOptions();
+                litmus.check(
+                        lookupHint.kvOptions.size() <= supportedKeys.size(),
+                        "Too many LOOKUP hint options {} beyond max number of supported options {}",
+                        lookupHint.kvOptions.size(),
+                        supportedKeys.size());
+
+                try {
+                    // try to validate all hint options by parsing them
+                    supportedKeys.forEach(conf::get);
+                } catch (IllegalArgumentException e) {
+                    litmus.fail("Invalid LOOKUP hint options: {}", e.getMessage());
+                }
+
+                // option value check
+                // async options are all optional
+                Boolean async = conf.get(LookupJoinHintOptions.ASYNC_LOOKUP);
+                if (Boolean.TRUE.equals(async)) {
+                    Integer capacity = conf.get(LookupJoinHintOptions.ASYNC_CAPACITY);
+                    litmus.check(
+                            null == capacity || capacity > 0,
+                            "Invalid LOOKUP hint option: {} value should be positive integer but was {}",
+                            LookupJoinHintOptions.ASYNC_CAPACITY.key(),
+                            capacity);
+                }
+
+                // retry options can be both null or all not null
+                String retryPredicate = conf.get(LookupJoinHintOptions.RETRY_PREDICATE);
+                LookupJoinHintOptions.RetryStrategy retryStrategy =
+                        conf.get(LookupJoinHintOptions.RETRY_STRATEGY);
+                Duration fixedDelay = conf.get(LookupJoinHintOptions.FIXED_DELAY);
+                Integer maxAttempts = conf.get(LookupJoinHintOptions.MAX_ATTEMPTS);
+                litmus.check(
+                        (null == retryPredicate
+                                        && null == retryStrategy
+                                        && null == fixedDelay
+                                        && null == fixedDelay
+                                        && null == maxAttempts)
+                                || (null != retryPredicate
+                                        && null != retryStrategy
+                                        && null != fixedDelay
+                                        && null != fixedDelay
+                                        && null != maxAttempts),
+                        "Invalid LOOKUP hint: retry options can be both null or all not null");
+
+                // if with retry options, all values should be valid
+                if (null != retryPredicate) {
+                    litmus.check(
+                            LookupJoinHintOptions.LOOKUP_MISS_PREDICATE.equalsIgnoreCase(
+                                    retryPredicate),
+                            "Invalid LOOKUP hint option: unsupported {} '{}', only '{}' is supported currently",
+                            LookupJoinHintOptions.RETRY_PREDICATE.key(),
+                            retryPredicate,
+                            LookupJoinHintOptions.LOOKUP_MISS_PREDICATE);
+                    litmus.check(
+                            maxAttempts > 0,
+                            "Invalid LOOKUP hint option: {} value should be positive integer but was {}",
+                            LookupJoinHintOptions.MAX_ATTEMPTS.key(),
+                            maxAttempts);
+                }
+                return true;
+            };
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
index 20f0ee2a037..815237aae0c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
@@ -28,6 +28,7 @@ import org.apache.calcite.rel.hint.Hintable;
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSnapshot;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.Collections;
@@ -107,9 +108,10 @@ public abstract class FlinkHints {
     }
 
     public static boolean canTransposeToTableScan(RelNode node) {
-        // TODO support look up join
         return node instanceof LogicalProject // computed column on table
-                || node instanceof LogicalFilter;
+                || node instanceof LogicalFilter
+                // TODO support lookup join hint with alias name in FLINK-28850
+                || node instanceof LogicalSnapshot;
     }
 
     /** Returns the qualified name of a table scan, otherwise returns empty. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java
index cf6345167e0..87795fda231 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java
@@ -44,7 +44,10 @@ public enum JoinStrategy {
      * Instructs the optimizer to use nest loop join strategy. If both sides are specified in this
      * hint, the side that is first written will be treated as the build side.
      */
-    NEST_LOOP("NEST_LOOP");
+    NEST_LOOP("NEST_LOOP"),
+
+    /** Instructs the optimizer to use lookup join strategy. Only accept key-value hint options. */
+    LOOKUP("LOOKUP");
 
     private final String joinHintName;
 
@@ -81,7 +84,13 @@ public enum JoinStrategy {
             case BROADCAST:
             case NEST_LOOP:
                 return options.size() > 0;
+            case LOOKUP:
+                return null == options || options.size() == 0;
         }
         return false;
     }
+
+    public static boolean isLookupHint(String hintName) {
+        return isJoinStrategy(hintName) && JoinStrategy.valueOf(hintName) == LOOKUP;
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/LookupJoinHintOptions.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/LookupJoinHintOptions.java
new file mode 100644
index 00000000000..b634572035d
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/LookupJoinHintOptions.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.hint;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** This {@link LookupJoinHintOptions} defines valid hint options of lookup join hint. */
+@Internal
+public class LookupJoinHintOptions {
+    private LookupJoinHintOptions() {}
+
+    public static final ConfigOption<String> LOOKUP_TABLE =
+            key("table")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The table name of the lookup source.");
+
+    public static final ConfigOption<Boolean> ASYNC_LOOKUP =
+            key("async")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Value can be 'true' or 'false' to suggest the planner choose the corresponding"
+                                    + " lookup function. If the backend lookup source does not support the"
+                                    + " suggested lookup mode, it will take no effect.");
+
+    public static final ConfigOption<ExecutionConfigOptions.AsyncOutputMode> ASYNC_OUTPUT_MODE =
+            key("output-mode")
+                    .enumType(ExecutionConfigOptions.AsyncOutputMode.class)
+                    .defaultValue(
+                            ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE
+                                    .defaultValue())
+                    .withDescription(
+                            "Output mode for asynchronous operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. "
+                                    + "If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not "
+                                    + "affect the correctness of the result, otherwise ORDERED will be still used.");
+
+    public static final ConfigOption<Integer> ASYNC_CAPACITY =
+            key("capacity")
+                    .intType()
+                    .defaultValue(
+                            ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_BUFFER_CAPACITY
+                                    .defaultValue())
+                    .withDescription(
+                            "The max number of async i/o operation that the async lookup join can trigger.");
+
+    public static final ConfigOption<Duration> ASYNC_TIMEOUT =
+            key("timeout")
+                    .durationType()
+                    .defaultValue(
+                            ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_TIMEOUT.defaultValue())
+                    .withDescription(
+                            "Timeout from first invoke to final completion of asynchronous operation, may include multiple"
+                                    + " retries, and will be reset in case of failover.");
+    public static final ConfigOption<String> RETRY_PREDICATE =
+            key("retry-predicate")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "A predicate expresses the retry condition, can be 'lookup-miss' which will"
+                                    + " enable retry if lookup result is empty.");
+
+    public static final ConfigOption<RetryStrategy> RETRY_STRATEGY =
+            key("retry-strategy")
+                    .enumType(RetryStrategy.class)
+                    .noDefaultValue()
+                    .withDescription("The retry strategy name, can be 'fixed-delay' for now.");
+
+    public static final ConfigOption<Duration> FIXED_DELAY =
+            key("fixed-delay")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription("Delay time for the 'fixed-delay' retry strategy.");
+
+    public static final ConfigOption<Integer> MAX_ATTEMPTS =
+            key("max-attempts")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("Max attempt number of the 'fixed-delay' retry strategy.");
+
+    public static final String LOOKUP_MISS_PREDICATE = "lookup_miss";
+
+    private static Set<ConfigOption> requiredKeys = new HashSet<>();
+    private static Set<ConfigOption> supportedKeys = new HashSet<>();
+
+    static {
+        requiredKeys.add(LOOKUP_TABLE);
+
+        supportedKeys.add(LOOKUP_TABLE);
+        supportedKeys.add(ASYNC_LOOKUP);
+        supportedKeys.add(ASYNC_CAPACITY);
+        supportedKeys.add(ASYNC_TIMEOUT);
+        supportedKeys.add(ASYNC_OUTPUT_MODE);
+        supportedKeys.add(RETRY_PREDICATE);
+        supportedKeys.add(RETRY_STRATEGY);
+        supportedKeys.add(FIXED_DELAY);
+        supportedKeys.add(MAX_ATTEMPTS);
+    }
+
+    public static ImmutableSet<ConfigOption> getRequiredOptions() {
+        return ImmutableSet.copyOf(requiredKeys);
+    }
+
+    public static ImmutableSet<ConfigOption> getSupportedOptions() {
+        return ImmutableSet.copyOf(supportedKeys);
+    }
+
+    /** Supported retry strategies. */
+    public enum RetryStrategy {
+        /** Fixed-delay retry strategy. */
+        FIXED_DELAY
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java
index 903e8eb2d7e..2c07e3f03ac 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java
@@ -66,6 +66,8 @@ public class BatchExecLookupJoin extends CommonExecLookupJoin implements BatchEx
                 ChangelogMode.insertOnly(),
                 Collections.singletonList(inputProperty),
                 outputType,
+                // batch lookup join does not support hint currently
+                null,
                 description);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
index facbb5ab50d..52192ee79ad 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
@@ -58,6 +58,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.LookupJoinHintSpec;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
@@ -90,6 +91,7 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import org.apache.calcite.plan.RelOptTable;
@@ -165,6 +167,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
     public static final String FIELD_NAME_FILTER_ON_TEMPORAL_TABLE = "filterOnTemporalTable";
 
     public static final String FIELD_NAME_INPUT_CHANGELOG_MODE = "inputChangelogMode";
+    public static final String FIELD_NAME_JOIN_HINT = "joinHint";
 
     @JsonProperty(FIELD_NAME_JOIN_TYPE)
     private final FlinkJoinType joinType;
@@ -192,6 +195,10 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
     @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODE)
     private final ChangelogMode inputChangelogMode;
 
+    @JsonProperty(FIELD_NAME_JOIN_HINT)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final @Nullable LookupJoinHintSpec joinHintSpec;
+
     protected CommonExecLookupJoin(
             int id,
             ExecNodeContext context,
@@ -206,6 +213,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
             ChangelogMode inputChangelogMode,
             List<InputProperty> inputProperties,
             RowType outputType,
+            @Nullable LookupJoinHintSpec lookupJoinHintSpec,
             String description) {
         super(id, context, persistedConfig, inputProperties, outputType, description);
         checkArgument(inputProperties.size() == 1);
@@ -216,6 +224,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
         this.projectionOnTemporalTable = projectionOnTemporalTable;
         this.filterOnTemporalTable = filterOnTemporalTable;
         this.inputChangelogMode = inputChangelogMode;
+        this.joinHintSpec = lookupJoinHintSpec;
     }
 
     public TemporalTableSourceSpec getTemporalTableSourceSpec() {
@@ -238,31 +247,31 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
         RowType resultRowType = (RowType) getOutputType();
         validateLookupKeyType(lookupKeys, inputRowType, tableSourceRowType);
 
+        UserDefinedFunction lookupFunction =
+                LookupJoinUtil.getLookupFunction(
+                        temporalTable,
+                        lookupKeys.keySet(),
+                        planner.getFlinkContext().getClassLoader(),
+                        joinHintSpec,
+                        upsertMaterialize);
+        UserDefinedFunctionHelper.prepareInstance(config, lookupFunction);
+
         boolean isLeftOuterJoin = joinType == FlinkJoinType.LEFT;
-        boolean isAsyncEnabled = false;
-        UserDefinedFunction userDefinedFunction;
-        boolean inputInsertOnly = inputChangelogMode.containsOnly(RowKind.INSERT);
+        boolean isAsyncEnabled = lookupFunction instanceof AsyncTableFunction;
 
         Transformation<RowData> inputTransformation =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
 
-        // upsertMaterialize only works on sync lookup mode, async lookup is unsupported.
-        if (!inputInsertOnly && upsertMaterialize) {
-            userDefinedFunction =
-                    LookupJoinUtil.getLookupFunction(
-                            temporalTable,
-                            lookupKeys.keySet(),
-                            planner.getFlinkContext().getClassLoader(),
-                            true);
-            UserDefinedFunctionHelper.prepareInstance(config, userDefinedFunction);
-
+        if (upsertMaterialize) {
+            // upsertMaterialize only works on sync lookup mode, async lookup is unsupported.
+            assert !isAsyncEnabled && !inputChangelogMode.containsOnly(RowKind.INSERT);
             return createSyncLookupJoinWithState(
                     inputTransformation,
                     temporalTable,
                     config,
                     planner.getFlinkContext().getClassLoader(),
                     lookupKeys,
-                    (TableFunction<Object>) userDefinedFunction,
+                    (TableFunction<Object>) lookupFunction,
                     planner.createRelBuilder(),
                     inputRowType,
                     tableSourceRowType,
@@ -271,15 +280,6 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
                     planner.getExecEnv().getConfig().isObjectReuseEnabled(),
                     lookupKeyContainsPrimaryKey);
         } else {
-            userDefinedFunction =
-                    LookupJoinUtil.getLookupFunction(
-                            temporalTable,
-                            lookupKeys.keySet(),
-                            planner.getFlinkContext().getClassLoader());
-            if (userDefinedFunction instanceof AsyncTableFunction) {
-                isAsyncEnabled = true;
-            }
-            UserDefinedFunctionHelper.prepareInstance(config, userDefinedFunction);
             StreamOperatorFactory<RowData> operatorFactory;
             if (isAsyncEnabled) {
                 operatorFactory =
@@ -288,7 +288,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
                                 config,
                                 planner.getFlinkContext().getClassLoader(),
                                 lookupKeys,
-                                (AsyncTableFunction<Object>) userDefinedFunction,
+                                (AsyncTableFunction<Object>) lookupFunction,
                                 planner.createRelBuilder(),
                                 inputRowType,
                                 tableSourceRowType,
@@ -301,7 +301,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
                                 config,
                                 planner.getFlinkContext().getClassLoader(),
                                 lookupKeys,
-                                (TableFunction<Object>) userDefinedFunction,
+                                (TableFunction<Object>) lookupFunction,
                                 planner.createRelBuilder(),
                                 inputRowType,
                                 tableSourceRowType,
@@ -450,6 +450,20 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
         }
     }
 
+    private LookupJoinUtil.AsyncLookupOptions getAsyncOptions(ExecNodeConfig config) {
+        if (joinHintSpec != null) {
+            // hint first
+            return new LookupJoinUtil.AsyncLookupOptions(
+                    joinHintSpec.getAsyncCapacity(),
+                    joinHintSpec.getAsyncTimeout(),
+                    joinHintSpec.getAsyncOutputMode());
+        }
+        return new LookupJoinUtil.AsyncLookupOptions(
+                config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_BUFFER_CAPACITY),
+                config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_TIMEOUT).toMillis(),
+                config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE));
+    }
+
     @SuppressWarnings("unchecked")
     private StreamOperatorFactory<RowData> createAsyncLookupJoin(
             RelOptTable temporalTable,
@@ -463,12 +477,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
             RowType resultRowType,
             boolean isLeftOuterJoin) {
 
-        int asyncBufferCapacity =
-                config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_BUFFER_CAPACITY);
-        long asyncTimeout =
-                config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_TIMEOUT).toMillis();
-        ExecutionConfigOptions.AsyncOutputMode asyncOutputMode =
-                config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE);
+        LookupJoinUtil.AsyncLookupOptions asyncLookupOptions = getAsyncOptions(config);
 
         DataTypeFactory dataTypeFactory =
                 ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
@@ -521,7 +530,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
                             generatedResultFuture,
                             InternalSerializers.create(rightRowType),
                             isLeftOuterJoin,
-                            asyncBufferCapacity);
+                            asyncLookupOptions.asyncBufferCapacity);
         } else {
             // right type is the same as table source row type, because no calc after temporal table
             asyncFunc =
@@ -531,11 +540,32 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
                             generatedResultFuture,
                             InternalSerializers.create(rightRowType),
                             isLeftOuterJoin,
-                            asyncBufferCapacity);
+                            asyncLookupOptions.asyncBufferCapacity);
+        }
+        /**
+         * why not implements async-retry directly in AsyncLookupFunction ? - because the active
+         * sleeping on async callback thread will occupy the task cpu time while the retry support
+         * in async data stream api provides a more efficient way via processing time service which
+         * does not occupy callback thread. Both AsyncLookupFunction AsyncTableFunction can support
+         * retry. does not occupy callback thread. Both AsyncLookupFunction AsyncTableFunction can
+         * support retry.
+         */
+        if (null != joinHintSpec) {
+            // simplify code here, not check whether ResultRetryStrategy is NO_RETRY_STRATEGY or not
+            // because AsyncWaitOperator has short-path optimization during compile time.
+            return new AsyncWaitOperatorFactory<>(
+                    asyncFunc,
+                    asyncLookupOptions.asyncTimeout,
+                    asyncLookupOptions.asyncBufferCapacity,
+                    convert(asyncLookupOptions.asyncOutputMode),
+                    joinHintSpec.toRetryStrategy());
+        } else {
+            return new AsyncWaitOperatorFactory<>(
+                    asyncFunc,
+                    asyncLookupOptions.asyncTimeout,
+                    asyncLookupOptions.asyncBufferCapacity,
+                    convert(asyncLookupOptions.asyncOutputMode));
         }
-
-        return new AsyncWaitOperatorFactory<>(
-                asyncFunc, asyncTimeout, asyncBufferCapacity, convert(asyncOutputMode));
     }
 
     private AsyncDataStream.OutputMode convert(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/LookupJoinHintSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/LookupJoinHintSpec.java
new file mode 100644
index 00000000000..796e7a3ea23
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/LookupJoinHintSpec.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.spec;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.retryable.RetryPredicates;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.planner.hint.LookupJoinHintOptions;
+import org.apache.flink.table.runtime.operators.join.lookup.ResultRetryStrategy;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.calcite.rel.hint.RelHint;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Objects;
+
+import static org.apache.flink.table.planner.hint.LookupJoinHintOptions.ASYNC_CAPACITY;
+import static org.apache.flink.table.planner.hint.LookupJoinHintOptions.ASYNC_LOOKUP;
+import static org.apache.flink.table.planner.hint.LookupJoinHintOptions.ASYNC_OUTPUT_MODE;
+import static org.apache.flink.table.planner.hint.LookupJoinHintOptions.ASYNC_TIMEOUT;
+import static org.apache.flink.table.planner.hint.LookupJoinHintOptions.FIXED_DELAY;
+import static org.apache.flink.table.planner.hint.LookupJoinHintOptions.MAX_ATTEMPTS;
+import static org.apache.flink.table.planner.hint.LookupJoinHintOptions.RETRY_PREDICATE;
+import static org.apache.flink.table.planner.hint.LookupJoinHintOptions.RETRY_STRATEGY;
+import static org.apache.flink.table.runtime.operators.join.lookup.ResultRetryStrategy.NO_RETRY_STRATEGY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * LookupJoinHintSpec describes the user specified hint options for lookup join.
+ *
+ * <p>This class corresponds to {@link
+ * org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin} rel node.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class LookupJoinHintSpec {
+    public static final String FIELD_NAME_ASYNC = "async";
+    public static final String FIELD_NAME_ASYNC_OUTPUT_MODE = "output-mode";
+    public static final String FIELD_NAME_ASYNC_CAPACITY = "capacity";
+    public static final String FIELD_NAME_ASYNC_TIMEOUT = "timeout";
+    public static final String FIELD_NAME_RETRY_PREDICATE = "retry-predicate";
+    public static final String FIELD_NAME_RETRY_STRATEGY = "retry-strategy";
+    public static final String FIELD_NAME_RETRY_FIXED_DELAY = "fixed-delay";
+    public static final String FIELD_NAME_RETRY_MAX_ATTEMPTS = "max-attempts";
+
+    @JsonProperty(FIELD_NAME_ASYNC)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final @Nullable Boolean async;
+
+    @JsonProperty(FIELD_NAME_ASYNC_OUTPUT_MODE)
+    private final ExecutionConfigOptions.AsyncOutputMode asyncOutputMode;
+
+    @JsonProperty(FIELD_NAME_ASYNC_CAPACITY)
+    private final Integer asyncCapacity;
+
+    @JsonProperty(FIELD_NAME_ASYNC_TIMEOUT)
+    private final Long asyncTimeout;
+
+    @JsonProperty(FIELD_NAME_RETRY_PREDICATE)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final @Nullable String retryPredicate;
+
+    @JsonProperty(FIELD_NAME_RETRY_STRATEGY)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final @Nullable LookupJoinHintOptions.RetryStrategy retryStrategy;
+
+    @JsonProperty(FIELD_NAME_RETRY_FIXED_DELAY)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final @Nullable Long retryFixedDelay;
+
+    @JsonProperty(FIELD_NAME_RETRY_MAX_ATTEMPTS)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final @Nullable Integer retryMaxAttempts;
+
+    @JsonCreator
+    public LookupJoinHintSpec(
+            @JsonProperty(FIELD_NAME_ASYNC) @Nullable Boolean async,
+            @JsonProperty(FIELD_NAME_ASYNC_OUTPUT_MODE) @Nullable
+                    ExecutionConfigOptions.AsyncOutputMode asyncOutputMode,
+            @JsonProperty(FIELD_NAME_ASYNC_CAPACITY) Integer asyncCapacity,
+            @JsonProperty(FIELD_NAME_ASYNC_TIMEOUT) Long asyncTimeout,
+            @JsonProperty(FIELD_NAME_RETRY_PREDICATE) String retryPredicate,
+            @JsonProperty(FIELD_NAME_RETRY_STRATEGY) @Nullable
+                    LookupJoinHintOptions.RetryStrategy retryStrategy,
+            @JsonProperty(FIELD_NAME_RETRY_FIXED_DELAY) @Nullable Long retryFixedDelay,
+            @JsonProperty(FIELD_NAME_RETRY_MAX_ATTEMPTS) @Nullable Integer retryMaxAttempts) {
+        this.async = async;
+        this.asyncOutputMode = checkNotNull(asyncOutputMode);
+        this.asyncCapacity = checkNotNull(asyncCapacity);
+        this.asyncTimeout = checkNotNull(asyncTimeout);
+        this.retryPredicate = retryPredicate;
+        this.retryStrategy = retryStrategy;
+        this.retryFixedDelay = retryFixedDelay;
+        this.retryMaxAttempts = retryMaxAttempts;
+    }
+
+    @JsonIgnore
+    @Nullable
+    public Boolean getAsync() {
+        return async;
+    }
+
+    @JsonIgnore
+    public Boolean isAsync() {
+        return Boolean.TRUE.equals(async);
+    }
+
+    @JsonIgnore
+    public ExecutionConfigOptions.AsyncOutputMode getAsyncOutputMode() {
+        return asyncOutputMode;
+    }
+
+    @JsonIgnore
+    public Integer getAsyncCapacity() {
+        return asyncCapacity;
+    }
+
+    @JsonIgnore
+    public Long getAsyncTimeout() {
+        return asyncTimeout;
+    }
+
+    @JsonIgnore
+    public String getRetryPredicate() {
+        return retryPredicate;
+    }
+
+    @JsonIgnore
+    @Nullable
+    public LookupJoinHintOptions.RetryStrategy getRetryStrategy() {
+        return retryStrategy;
+    }
+
+    @JsonIgnore
+    @Nullable
+    public Long getRetryFixedDelay() {
+        return retryFixedDelay;
+    }
+
+    @JsonIgnore
+    @Nullable
+    public Integer getRetryMaxAttempts() {
+        return retryMaxAttempts;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        LookupJoinHintSpec that = (LookupJoinHintSpec) o;
+        return Objects.equals(async, that.async)
+                && asyncOutputMode == that.asyncOutputMode
+                && Objects.equals(asyncCapacity, that.asyncCapacity)
+                && Objects.equals(asyncTimeout, that.asyncTimeout)
+                && Objects.equals(retryPredicate, that.retryPredicate)
+                && retryStrategy == that.retryStrategy
+                && Objects.equals(retryFixedDelay, that.retryFixedDelay)
+                && Objects.equals(retryMaxAttempts, that.retryMaxAttempts);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                async,
+                asyncOutputMode,
+                asyncCapacity,
+                asyncTimeout,
+                retryPredicate,
+                retryStrategy,
+                retryFixedDelay,
+                retryMaxAttempts);
+    }
+
+    /**
+     * Convert given joinHint to {@link LookupJoinHintSpec}.
+     *
+     * @param lookupJoinHint
+     */
+    @JsonIgnore
+    public static LookupJoinHintSpec fromJoinHint(RelHint lookupJoinHint) {
+        Configuration conf = Configuration.fromMap(lookupJoinHint.kvOptions);
+        Duration fixedDelay = conf.get(FIXED_DELAY);
+        return new LookupJoinHintSpec(
+                conf.get(ASYNC_LOOKUP),
+                conf.get(ASYNC_OUTPUT_MODE),
+                conf.get(ASYNC_CAPACITY),
+                conf.get(ASYNC_TIMEOUT).toMillis(),
+                conf.get(RETRY_PREDICATE),
+                conf.get(RETRY_STRATEGY),
+                fixedDelay != null ? fixedDelay.toMillis() : null,
+                conf.get(MAX_ATTEMPTS));
+    }
+
+    /**
+     * Convert this {@link LookupJoinHintSpec} to {@link ResultRetryStrategy} in a best effort
+     * manner. If invalid {@link LookupJoinHintOptions#RETRY_PREDICATE} or {@link
+     * LookupJoinHintOptions#RETRY_STRATEGY} is given, then {@link
+     * ResultRetryStrategy#NO_RETRY_STRATEGY} will return.
+     */
+    @JsonIgnore
+    public ResultRetryStrategy toRetryStrategy() {
+        if (null == retryPredicate
+                || !LookupJoinHintOptions.LOOKUP_MISS_PREDICATE.equalsIgnoreCase(retryPredicate)
+                || retryStrategy != LookupJoinHintOptions.RetryStrategy.FIXED_DELAY) {
+            return NO_RETRY_STRATEGY;
+        }
+        // retry option values have been validated by hint checker
+        return ResultRetryStrategy.fixedDelayRetry(
+                this.retryMaxAttempts,
+                this.retryFixedDelay,
+                RetryPredicates.EMPTY_RESULT_PREDICATE);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
index 3ae030e259e..6b83b3d380c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.LookupJoinHintSpec;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec;
 import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
 import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
@@ -79,6 +80,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin implements Stream
             RowType outputType,
             boolean lookupKeyContainsPrimaryKey,
             boolean upsertMaterialize,
+            @Nullable LookupJoinHintSpec lookupJoinHintSpec,
             String description) {
         this(
                 ExecNodeContext.newNodeId(),
@@ -95,6 +97,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin implements Stream
                 outputType,
                 lookupKeyContainsPrimaryKey,
                 upsertMaterialize,
+                lookupJoinHintSpec,
                 description);
     }
 
@@ -119,6 +122,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin implements Stream
             @JsonProperty(FIELD_NAME_LOOKUP_KEY_CONTAINS_PRIMARY_KEY)
                     boolean lookupKeyContainsPrimaryKey,
             @JsonProperty(FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE) boolean upsertMaterialize,
+            @JsonProperty(FIELD_NAME_JOIN_HINT) @Nullable LookupJoinHintSpec lookupJoinHintSpec,
             @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
         super(
                 id,
@@ -133,6 +137,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin implements Stream
                 inputChangelogMode,
                 inputProperties,
                 outputType,
+                lookupJoinHintSpec,
                 description);
         this.lookupKeyContainsPrimaryKey = lookupKeyContainsPrimaryKey;
         this.upsertMaterialize = upsertMaterialize;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java
index 407d2ce79ab..ba11c29c1c9 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolver.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.optimize;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.planner.hint.FlinkHints;
 import org.apache.flink.table.planner.hint.JoinStrategy;
@@ -28,6 +29,7 @@ import org.apache.calcite.rel.RelShuttleImpl;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.hint.Hintable;
 import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.commons.lang3.StringUtils;
 
@@ -39,6 +41,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.singletonList;
+import static org.apache.flink.table.planner.hint.LookupJoinHintOptions.LOOKUP_TABLE;
 
 /**
  * Resolve and validate the join hints.
@@ -73,6 +76,11 @@ public class JoinHintResolver extends RelShuttleImpl {
         return visitBiRel(join);
     }
 
+    @Override
+    public RelNode visit(LogicalCorrelate correlate) {
+        return visitBiRel(correlate);
+    }
+
     private RelNode visitBiRel(BiRel biRel) {
         Optional<String> leftName = extractAliasOrTableName(biRel.getLeft());
         Optional<String> rightName = extractAliasOrTableName(biRel.getRight());
@@ -83,7 +91,17 @@ public class JoinHintResolver extends RelShuttleImpl {
         List<RelHint> newHints = new ArrayList<>();
 
         for (RelHint hint : oldHints) {
-            if (JoinStrategy.isJoinStrategy(hint.hintName)) {
+            if (JoinStrategy.isLookupHint(hint.hintName)) {
+                allHints.add(trimInheritPath(hint));
+                Configuration conf = Configuration.fromMap(hint.kvOptions);
+                // hint option checker has done the validation
+                String lookupTable = conf.get(LOOKUP_TABLE);
+                assert null != lookupTable;
+                if (rightName.isPresent() && matchIdentifier(lookupTable, rightName.get())) {
+                    validHints.add(trimInheritPath(hint));
+                    newHints.add(hint);
+                }
+            } else if (JoinStrategy.isJoinStrategy(hint.hintName)) {
                 allHints.add(trimInheritPath(hint));
                 // the declared table name or query block name is replaced by
                 // JoinStrategy#LEFT_INPUT or JoinStrategy#RIGHT_INPUT
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
index 5545dda5da6..833eb1908a7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.plan.utils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
 import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.LookupTableSource;
@@ -34,8 +35,10 @@ import org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupP
 import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.LookupFunction;
 import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.LookupJoinHintSpec;
 import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
 import org.apache.flink.table.planner.plan.schema.TableSourceTable;
 import org.apache.flink.table.runtime.connector.source.LookupRuntimeProviderContext;
@@ -45,6 +48,8 @@ import org.apache.flink.table.runtime.functions.table.lookup.fullcache.CacheLoad
 import org.apache.flink.table.runtime.functions.table.lookup.fullcache.LookupFullCache;
 import org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat.InputFormatCacheLoader;
 import org.apache.flink.table.runtime.keyselector.GenericRowDataKeySelector;
+import org.apache.flink.table.runtime.operators.join.lookup.ResultRetryStrategy;
+import org.apache.flink.table.runtime.operators.join.lookup.RetryableLookupFunctionDelegator;
 import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.sources.LookupableTableSource;
@@ -68,6 +73,8 @@ import java.util.List;
 import java.util.Objects;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.table.runtime.operators.join.lookup.ResultRetryStrategy.NO_RETRY_STRATEGY;
+
 /** Utilities for lookup joins using {@link LookupTableSource}. */
 @Internal
 public final class LookupJoinUtil {
@@ -160,6 +167,27 @@ public final class LookupJoinUtil {
         // no instantiation
     }
 
+    /** AsyncLookupOptions includes async related options. */
+    public static class AsyncLookupOptions {
+        public final int asyncBufferCapacity;
+        public final long asyncTimeout;
+        public final ExecutionConfigOptions.AsyncOutputMode asyncOutputMode;
+
+        public AsyncLookupOptions(
+                int asyncBufferCapacity,
+                long asyncTimeout,
+                ExecutionConfigOptions.AsyncOutputMode asyncOutputMode) {
+            this.asyncBufferCapacity = asyncBufferCapacity;
+            this.asyncTimeout = asyncTimeout;
+            this.asyncOutputMode = asyncOutputMode;
+        }
+    }
+
+    private static class LookupFunctionCandidates {
+        UserDefinedFunction syncLookupFunction;
+        UserDefinedFunction asyncLookupFunction;
+    }
+
     /** Gets lookup keys sorted by index in ascending order. */
     public static int[] getOrderedLookupKeys(Collection<Integer> allLookupKeys) {
         List<Integer> lookupKeyIndicesInOrder = new ArrayList<>(allLookupKeys);
@@ -167,106 +195,209 @@ public final class LookupJoinUtil {
         return lookupKeyIndicesInOrder.stream().mapToInt(Integer::intValue).toArray();
     }
 
-    /** Gets LookupFunction from temporal table according to the given lookup keys. */
-    public static UserDefinedFunction getLookupFunction(
-            RelOptTable temporalTable, Collection<Integer> lookupKeys, ClassLoader classLoader) {
-        return getLookupFunction(temporalTable, lookupKeys, classLoader, false);
-    }
-
     /**
-     * Gets LookupFunction from temporal table according to the given lookup keys. If specifies
-     * requireSyncLookup, then only sync function will be created or raise an error if not
-     * implemented.
+     * Gets LookupFunction from temporal table according to the given lookup keys with preference.
+     *
+     * @return the UserDefinedFunction by preferable lookup mode, if require
      */
     public static UserDefinedFunction getLookupFunction(
             RelOptTable temporalTable,
             Collection<Integer> lookupKeys,
             ClassLoader classLoader,
-            boolean requireSyncLookup) {
+            LookupJoinHintSpec joinHintSpec,
+            boolean upsertMaterialize) {
+        // async & sync lookup candidates
+        LookupFunctionCandidates lookupFunctionCandidates = new LookupFunctionCandidates();
+
+        // prefer (not require) by default
+        boolean asyncLookup = LookupJoinUtil.preferAsync(joinHintSpec);
+        boolean require = false;
+        if (upsertMaterialize) {
+            // upsertMaterialize only works on sync lookup mode, async lookup is unsupported.
+            require = true;
+            asyncLookup = false;
+        }
 
         int[] lookupKeyIndicesInOrder = getOrderedLookupKeys(lookupKeys);
-
         if (temporalTable instanceof TableSourceTable) {
-            LookupTableSource.LookupRuntimeProvider provider =
-                    getLookupRuntimeProvider(temporalTable, lookupKeys);
-
-            // TODO this method will be refactored in FLINK-28848
-            if (requireSyncLookup
-                    && !(provider instanceof TableFunctionProvider)
-                    && !(provider instanceof LookupFunctionProvider)) {
-                throw new TableException(
-                        String.format(
-                                "Require a synchronous lookup function due to planner's requirement but no "
-                                        + "available functions in TableSourceTable: %s, please check the code to ensure "
-                                        + "a proper LookupFunctionProvider or TableFunctionProvider is specified.",
-                                temporalTable.getQualifiedName()));
+            findLookupFunctionFromNewSource(
+                    (TableSourceTable) temporalTable,
+                    lookupKeyIndicesInOrder,
+                    joinHintSpec,
+                    classLoader,
+                    lookupFunctionCandidates);
+        }
+        if (temporalTable instanceof LegacyTableSourceTable) {
+            findLookupFunctionFromLegacySource(
+                    (LegacyTableSourceTable<?>) temporalTable,
+                    lookupKeyIndicesInOrder,
+                    lookupFunctionCandidates);
+        }
+        UserDefinedFunction selectLookupFunction =
+                selectLookupFunction(
+                        lookupFunctionCandidates.asyncLookupFunction,
+                        lookupFunctionCandidates.syncLookupFunction,
+                        require,
+                        asyncLookup);
+
+        if (null == selectLookupFunction) {
+            StringBuilder errorMsg = new StringBuilder();
+            if (require) {
+                errorMsg.append("Required ")
+                        .append(asyncLookup ? "async" : "sync")
+                        .append(" lookup function by planner, but ");
             }
-            if (provider instanceof LookupFunctionProvider) {
-                if (provider instanceof PartialCachingLookupProvider) {
-                    PartialCachingLookupProvider partialCachingLookupProvider =
-                            (PartialCachingLookupProvider) provider;
-                    return new CachingLookupFunction(
-                            partialCachingLookupProvider.getCache(),
-                            partialCachingLookupProvider.createLookupFunction());
-                } else if (provider instanceof FullCachingLookupProvider) {
-                    FullCachingLookupProvider fullCachingLookupProvider =
-                            (FullCachingLookupProvider) provider;
-                    RowType tableSourceRowType =
-                            FlinkTypeFactory.toLogicalRowType(temporalTable.getRowType());
-                    LookupFullCache fullCache =
-                            createFullCache(
-                                    fullCachingLookupProvider,
-                                    lookupKeyIndicesInOrder,
-                                    classLoader,
-                                    tableSourceRowType);
-                    return new CachingLookupFunction(
-                            fullCache, fullCachingLookupProvider.createLookupFunction());
-                }
-                return ((LookupFunctionProvider) provider).createLookupFunction();
-            } else if (provider instanceof AsyncLookupFunctionProvider) {
-                if (provider instanceof PartialCachingAsyncLookupProvider) {
-                    PartialCachingAsyncLookupProvider partialCachingLookupProvider =
-                            (PartialCachingAsyncLookupProvider) provider;
-                    return new CachingAsyncLookupFunction(
-                            partialCachingLookupProvider.getCache(),
-                            partialCachingLookupProvider.createAsyncLookupFunction());
-                }
-                return ((AsyncLookupFunctionProvider) provider).createAsyncLookupFunction();
-            } else if (provider instanceof TableFunctionProvider) {
-                return ((TableFunctionProvider<?>) provider).createTableFunction();
-            } else if (provider instanceof AsyncTableFunctionProvider) {
-                return ((AsyncTableFunctionProvider<?>) provider).createAsyncTableFunction();
+            errorMsg.append("table ")
+                    .append(temporalTable.getQualifiedName())
+                    .append(
+                            "does not offer a valid lookup function neither as TableSourceTable nor LegacyTableSourceTable");
+            throw new TableException(errorMsg.toString());
+        }
+        return selectLookupFunction;
+    }
+
+    /**
+     * Wraps LookupFunction into a RetryableLookupFunctionDelegator to support retry. Note: only
+     * LookupFunction is supported.
+     */
+    private static LookupFunction wrapSyncRetryDelegator(
+            LookupFunctionProvider provider, LookupJoinHintSpec joinHintSpec) {
+        if (joinHintSpec != null) {
+            ResultRetryStrategy retryStrategy = joinHintSpec.toRetryStrategy();
+            if (retryStrategy != NO_RETRY_STRATEGY) {
+                return new RetryableLookupFunctionDelegator(
+                        provider.createLookupFunction(), joinHintSpec.toRetryStrategy());
             }
         }
+        return provider.createLookupFunction();
+    }
 
-        if (temporalTable instanceof LegacyTableSourceTable) {
-            String[] lookupFieldNamesInOrder =
-                    IntStream.of(lookupKeyIndicesInOrder)
-                            .mapToObj(temporalTable.getRowType().getFieldNames()::get)
-                            .toArray(String[]::new);
-            LegacyTableSourceTable<?> legacyTableSourceTable =
-                    (LegacyTableSourceTable<?>) temporalTable;
-            LookupableTableSource<?> tableSource =
-                    (LookupableTableSource<?>) legacyTableSourceTable.tableSource();
-            if (!requireSyncLookup && tableSource.isAsyncEnabled()) {
-                return tableSource.getAsyncLookupFunction(lookupFieldNamesInOrder);
+    private static void findLookupFunctionFromNewSource(
+            TableSourceTable temporalTable,
+            int[] lookupKeyIndicesInOrder,
+            LookupJoinHintSpec joinHintSpec,
+            ClassLoader classLoader,
+            LookupFunctionCandidates lookupFunctionCandidates) {
+        UserDefinedFunction syncLookupFunction = null;
+        UserDefinedFunction asyncLookupFunction = null;
+
+        // TODO: support nested lookup keys in the future,
+        //  currently we only support top-level lookup keys
+        int[][] indices =
+                IntStream.of(lookupKeyIndicesInOrder)
+                        .mapToObj(i -> new int[] {i})
+                        .toArray(int[][]::new);
+
+        LookupTableSource tableSource = (LookupTableSource) temporalTable.tableSource();
+        LookupRuntimeProviderContext providerContext = new LookupRuntimeProviderContext(indices);
+        LookupTableSource.LookupRuntimeProvider provider =
+                tableSource.getLookupRuntimeProvider(providerContext);
+
+        if (provider instanceof LookupFunctionProvider) {
+            if (provider instanceof PartialCachingLookupProvider) {
+                PartialCachingLookupProvider partialCachingLookupProvider =
+                        (PartialCachingLookupProvider) provider;
+                syncLookupFunction =
+                        new CachingLookupFunction(
+                                partialCachingLookupProvider.getCache(),
+                                wrapSyncRetryDelegator(partialCachingLookupProvider, joinHintSpec));
+            } else if (provider instanceof FullCachingLookupProvider) {
+                FullCachingLookupProvider fullCachingLookupProvider =
+                        (FullCachingLookupProvider) provider;
+                RowType tableSourceRowType =
+                        FlinkTypeFactory.toLogicalRowType(temporalTable.getRowType());
+                LookupFullCache fullCache =
+                        createFullCache(
+                                fullCachingLookupProvider,
+                                lookupKeyIndicesInOrder,
+                                classLoader,
+                                tableSourceRowType);
+                syncLookupFunction =
+                        new CachingLookupFunction(
+                                fullCache, fullCachingLookupProvider.createLookupFunction());
             } else {
-                UserDefinedFunction lookupFunc =
-                        tableSource.getLookupFunction(lookupFieldNamesInOrder);
-                if (null == lookupFunc) {
-                    throw new TableException(
-                            String.format(
-                                    "Require a synchronous TableFunction due to planner's requirement but can not create one from "
-                                            + "LegacyTableSourceTable: %s, please check the code to ensure getLookupFunction is implemented.",
-                                    temporalTable.getQualifiedName()));
-                }
-                return lookupFunc;
+                syncLookupFunction =
+                        wrapSyncRetryDelegator((LookupFunctionProvider) provider, joinHintSpec);
             }
         }
-        throw new TableException(
-                String.format(
-                        "table %s is neither TableSourceTable not LegacyTableSourceTable",
-                        temporalTable.getQualifiedName()));
+        if (provider instanceof AsyncLookupFunctionProvider) {
+            if (provider instanceof PartialCachingAsyncLookupProvider) {
+                PartialCachingAsyncLookupProvider partialCachingLookupProvider =
+                        (PartialCachingAsyncLookupProvider) provider;
+                asyncLookupFunction =
+                        new CachingAsyncLookupFunction(
+                                partialCachingLookupProvider.getCache(),
+                                partialCachingLookupProvider.createAsyncLookupFunction());
+            } else {
+                asyncLookupFunction =
+                        ((AsyncLookupFunctionProvider) provider).createAsyncLookupFunction();
+            }
+        }
+        if (provider instanceof TableFunctionProvider) {
+            syncLookupFunction = ((TableFunctionProvider<?>) provider).createTableFunction();
+        }
+        if (provider instanceof AsyncTableFunctionProvider) {
+            asyncLookupFunction =
+                    ((AsyncTableFunctionProvider<?>) provider).createAsyncTableFunction();
+        }
+        setLookupFunctions(lookupFunctionCandidates, asyncLookupFunction, syncLookupFunction);
+    }
+
+    private static void setLookupFunctions(
+            LookupFunctionCandidates lookupFunctionCandidates,
+            UserDefinedFunction asyncLookupFunction,
+            UserDefinedFunction syncLookupFunction) {
+        if (asyncLookupFunction != null) {
+            lookupFunctionCandidates.asyncLookupFunction = asyncLookupFunction;
+        }
+        if (syncLookupFunction != null) {
+            lookupFunctionCandidates.syncLookupFunction = syncLookupFunction;
+        }
+    }
+
+    private static void findLookupFunctionFromLegacySource(
+            LegacyTableSourceTable temporalTable,
+            int[] lookupKeyIndicesInOrder,
+            LookupFunctionCandidates lookupFunctionCandidates) {
+        UserDefinedFunction syncLookupFunction = null;
+        UserDefinedFunction asyncLookupFunction = null;
+        String[] lookupFieldNamesInOrder =
+                IntStream.of(lookupKeyIndicesInOrder)
+                        .mapToObj(temporalTable.getRowType().getFieldNames()::get)
+                        .toArray(String[]::new);
+        LegacyTableSourceTable<?> legacyTableSourceTable =
+                (LegacyTableSourceTable<?>) temporalTable;
+        LookupableTableSource<?> tableSource =
+                (LookupableTableSource<?>) legacyTableSourceTable.tableSource();
+        if (tableSource.isAsyncEnabled()) {
+            asyncLookupFunction = tableSource.getAsyncLookupFunction(lookupFieldNamesInOrder);
+        }
+        syncLookupFunction = tableSource.getLookupFunction(lookupFieldNamesInOrder);
+        setLookupFunctions(lookupFunctionCandidates, asyncLookupFunction, syncLookupFunction);
+    }
+
+    private static UserDefinedFunction selectLookupFunction(
+            UserDefinedFunction asyncLookupFunction,
+            UserDefinedFunction syncLookupFunction,
+            boolean require,
+            boolean async) {
+        if (require) {
+            return async ? asyncLookupFunction : syncLookupFunction;
+        } else {
+            if (async) {
+                // prefer async
+                return null != asyncLookupFunction ? asyncLookupFunction : syncLookupFunction;
+            }
+            // prefer sync
+            return null != syncLookupFunction ? syncLookupFunction : asyncLookupFunction;
+        }
+    }
+
+    public static boolean preferAsync(LookupJoinHintSpec lookupJoinHintSpec) {
+        // async option has no default value, prefer async except async option is false
+        return null == lookupJoinHintSpec
+                || null == lookupJoinHintSpec.getAsync()
+                || lookupJoinHintSpec.isAsync();
     }
 
     public static boolean isAsyncLookup(RelOptTable temporalTable, Collection<Integer> lookupKeys) {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
index d8193eed182..568909ddd5b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala
@@ -21,6 +21,7 @@ import org.apache.flink.table.api.TableException
 import org.apache.flink.table.catalog.{ObjectIdentifier, UniqueConstraint}
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
+import org.apache.flink.table.planner.plan.nodes.exec.spec.LookupJoinHintSpec
 import org.apache.flink.table.planner.plan.schema.{IntermediateRelTable, LegacyTableSourceTable, TableSourceTable}
 import org.apache.flink.table.planner.plan.utils.{ExpressionFormat, JoinTypeUtil, LookupJoinUtil, RelExplainUtil}
 import org.apache.flink.table.planner.plan.utils.ExpressionFormat.ExpressionFormat
@@ -81,7 +82,8 @@ abstract class CommonPhysicalLookupJoin(
     val temporalTable: RelOptTable,
     val calcOnTemporalTable: Option[RexProgram],
     val joinInfo: JoinInfo,
-    val joinType: JoinRelType)
+    val joinType: JoinRelType,
+    val lookupHintSpec: Option[LookupJoinHintSpec] = Option.empty[LookupJoinHintSpec])
   extends SingleRel(cluster, traitSet, inputRel)
   with FlinkRelNode {
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala
index 650a2210a84..6ea85096dc0 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.nodes.physical.stream
 
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
-import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec
+import org.apache.flink.table.planner.plan.nodes.exec.spec.{LookupJoinHintSpec, TemporalTableSourceSpec}
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin
 import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin
 import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, FlinkRexUtil, JoinTypeUtil}
@@ -44,6 +44,7 @@ class StreamPhysicalLookupJoin(
     tableCalcProgram: Option[RexProgram],
     joinInfo: JoinInfo,
     joinType: JoinRelType,
+    lookupHintSpec: Option[LookupJoinHintSpec],
     val upsertMaterialize: Boolean = false)
   extends CommonPhysicalLookupJoin(
     cluster,
@@ -52,7 +53,8 @@ class StreamPhysicalLookupJoin(
     temporalTable,
     tableCalcProgram,
     joinInfo,
-    joinType)
+    joinType,
+    lookupHintSpec)
   with StreamPhysicalRel {
 
   override def requireWatermark: Boolean = false
@@ -66,7 +68,9 @@ class StreamPhysicalLookupJoin(
       tableCalcProgram,
       joinInfo,
       joinType,
-      upsertMaterialize)
+      lookupHintSpec,
+      upsertMaterialize
+    )
   }
 
   def copy(upsertMaterialize: Boolean): StreamPhysicalLookupJoin = {
@@ -78,7 +82,9 @@ class StreamPhysicalLookupJoin(
       tableCalcProgram,
       joinInfo,
       joinType,
-      upsertMaterialize)
+      lookupHintSpec,
+      upsertMaterialize
+    )
   }
 
   override def translateToExecNode(): ExecNode[_] = {
@@ -105,6 +111,7 @@ class StreamPhysicalLookupJoin(
       FlinkTypeFactory.toLogicalRowType(getRowType),
       lookupKeyContainsPrimaryKey(),
       upsertMaterialize,
+      lookupHintSpec.orNull,
       getRelDetailedDescription)
   }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala
index a4d0baa1912..ecd2bfb3717 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/BatchCommonSubGraphBasedOptimizer.scala
@@ -25,7 +25,6 @@ import org.apache.flink.table.planner.delegation.BatchPlanner
 import org.apache.flink.table.planner.plan.nodes.calcite.{LegacySink, Sink}
 import org.apache.flink.table.planner.plan.optimize.program.{BatchOptimizeContext, FlinkBatchProgram}
 import org.apache.flink.table.planner.plan.schema.IntermediateRelTable
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.{toJava, toScala}
 import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext
 import org.apache.flink.table.planner.utils.TableConfigUtils
 import org.apache.flink.util.Preconditions
@@ -39,20 +38,9 @@ class BatchCommonSubGraphBasedOptimizer(planner: BatchPlanner)
   extends CommonSubGraphBasedOptimizer {
 
   override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = {
-    // TODO currently join hint only works in BATCH
-    // resolve hints before optimizing
-    val joinHintResolver = new JoinHintResolver()
-    val resolvedHintRoots = joinHintResolver.resolve(toJava(roots))
-
-    // clear query block alias before optimizing
-    val clearQueryBlockAliasResolver = new ClearQueryBlockAliasResolver
-    val resolvedAliasRoots = clearQueryBlockAliasResolver.resolve(resolvedHintRoots)
-
     // build RelNodeBlock plan
     val rootBlocks =
-      RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(
-        toScala(resolvedAliasRoots),
-        planner.getTableConfig)
+      RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, planner.getTableConfig)
     // optimize recursively RelNodeBlock
     rootBlocks.foreach(optimizeBlock)
     rootBlocks
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/CommonSubGraphBasedOptimizer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/CommonSubGraphBasedOptimizer.scala
index 99f29090a64..aa5e3db807d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/CommonSubGraphBasedOptimizer.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/CommonSubGraphBasedOptimizer.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.optimize
 import org.apache.flink.table.planner.plan.reuse.SubplanReuser
 import org.apache.flink.table.planner.plan.schema.IntermediateRelTable
 import org.apache.flink.table.planner.plan.utils.SameRelObjectShuttle
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toJava
 import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
 
 import org.apache.calcite.rel.{RelNode, RelShuttleImpl}
@@ -75,7 +76,15 @@ abstract class CommonSubGraphBasedOptimizer extends Optimizer {
    *   a list of RelNode represents an optimized RelNode DAG.
    */
   override def optimize(roots: Seq[RelNode]): Seq[RelNode] = {
-    val sinkBlocks = doOptimize(roots)
+    // resolve hints before optimizing
+    val joinHintResolver = new JoinHintResolver()
+    val resolvedHintRoots = joinHintResolver.resolve(toJava(roots))
+
+    // clear query block alias bef optimizing
+    val clearQueryBlockAliasResolver = new ClearQueryBlockAliasResolver
+    val resolvedAliasRoots = clearQueryBlockAliasResolver.resolve(resolvedHintRoots)
+
+    val sinkBlocks = doOptimize(resolvedAliasRoots)
     val optimizedPlan = sinkBlocks.map {
       block =>
         val plan = block.getOptimizedPlan
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLookupJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLookupJoinRule.scala
index 7a58809b27d..85328ea075b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLookupJoinRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLookupJoinRule.scala
@@ -17,7 +17,9 @@
  */
 package org.apache.flink.table.planner.plan.rules.physical.stream
 
+import org.apache.flink.table.planner.hint.JoinStrategy
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
+import org.apache.flink.table.planner.plan.nodes.exec.spec.LookupJoinHintSpec
 import org.apache.flink.table.planner.plan.nodes.logical._
 import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin
 import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLookupJoin
@@ -77,6 +79,17 @@ object StreamPhysicalLookupJoinRule {
     val requiredTrait = input.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
 
     val convInput = RelOptRule.convert(input, requiredTrait)
+
+    val lookupJoinHint = join.getHints
+      .stream()
+      .filter(hint => JoinStrategy.isLookupHint(hint.hintName))
+      .findFirst()
+    val lookupJoinHintSpec = if (lookupJoinHint.isPresent) {
+      Option.apply(LookupJoinHintSpec.fromJoinHint(lookupJoinHint.get()))
+    } else {
+      Option.empty[LookupJoinHintSpec]
+    }
+
     new StreamPhysicalLookupJoin(
       cluster,
       providedTrait,
@@ -84,6 +97,7 @@ object StreamPhysicalLookupJoinRule {
       temporalTable,
       calcProgram,
       joinInfo,
-      join.getJoinType)
+      join.getJoinType,
+      lookupJoinHintSpec)
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTest.java
index 1239c46448b..6801c602539 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTest.java
@@ -18,84 +18,27 @@
 
 package org.apache.flink.table.planner.alias;
 
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.GenericInMemoryCatalog;
-import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import org.apache.flink.table.planner.hint.FlinkHints;
 import org.apache.flink.table.planner.hint.JoinStrategy;
-import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
-import org.apache.flink.table.planner.utils.BatchTableTestUtil;
-import org.apache.flink.table.planner.utils.PlannerMocks;
-import org.apache.flink.table.planner.utils.TableTestBase;
-import org.apache.flink.table.utils.CatalogManagerMocks;
+import org.apache.flink.table.planner.utils.TableTestUtil;
 
-import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.hint.RelHint;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Collections;
-
-/** A test class for {@link ClearJoinHintWithInvalidPropagationShuttle}. */
-public class ClearJoinHintWithInvalidPropagationShuttleTest extends TableTestBase {
-
-    private final BatchTableTestUtil util = batchTestUtil(TableConfig.getDefault());
-    private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default");
-    private final CatalogManager catalogManager =
-            CatalogManagerMocks.preparedCatalogManager()
-                    .defaultCatalog("builtin", catalog)
-                    .config(
-                            Configuration.fromMap(
-                                    Collections.singletonMap(
-                                            ExecutionOptions.RUNTIME_MODE.key(),
-                                            RuntimeExecutionMode.BATCH.name())))
-                    .build();
-    private final PlannerMocks plannerMocks =
-            PlannerMocks.newBuilder()
-                    .withBatchMode(true)
-                    .withCatalogManager(catalogManager)
-                    .build();
-    private final FlinkRelBuilder builder = plannerMocks.getPlannerContext().createRelBuilder();
-
-    @Before
-    public void before() throws Exception {
-        util.tableEnv().registerCatalog("testCatalog", catalog);
-        util.tableEnv().executeSql("use catalog testCatalog");
-
-        util.tableEnv()
-                .executeSql(
-                        "CREATE TABLE t1 (\n"
-                                + "  a BIGINT\n"
-                                + ") WITH (\n"
-                                + " 'connector' = 'values',\n"
-                                + " 'bounded' = 'true'\n"
-                                + ")");
-
-        util.tableEnv()
-                .executeSql(
-                        "CREATE TABLE t2 (\n"
-                                + "  a BIGINT\n"
-                                + ") WITH (\n"
-                                + " 'connector' = 'values',\n"
-                                + " 'bounded' = 'true'\n"
-                                + ")");
-
-        util.tableEnv()
-                .executeSql(
-                        "CREATE TABLE t3 (\n"
-                                + "  a BIGINT\n"
-                                + ") WITH (\n"
-                                + " 'connector' = 'values',\n"
-                                + " 'bounded' = 'true'\n"
-                                + ")");
+/** Tests clearing join hint with invalid propagation in batch. */
+public class ClearJoinHintWithInvalidPropagationShuttleTest
+        extends ClearJoinHintWithInvalidPropagationShuttleTestBase {
+    @Override
+    TableTestUtil getTableTestUtil() {
+        return batchTestUtil(TableConfig.getDefault());
+    }
+
+    @Override
+    boolean isBatchMode() {
+        return true;
     }
 
     @Test
@@ -202,24 +145,4 @@ public class ClearJoinHintWithInvalidPropagationShuttleTest extends TableTestBas
                         .build();
         verifyRelPlan(root);
     }
-
-    private String buildRelPlanWithQueryBlockAlias(RelNode node) {
-        return System.lineSeparator()
-                + FlinkRelOptUtil.toString(
-                        node, SqlExplainLevel.EXPPLAN_ATTRIBUTES, false, false, true, false, true);
-    }
-
-    private void verifyRelPlan(RelNode node) {
-        String plan = buildRelPlanWithQueryBlockAlias(node);
-        util.assertEqualsOrExpand("beforePropagatingHints", plan, true);
-
-        RelNode rootAfterHintPropagation = RelOptUtil.propagateRelHints(node, false);
-        plan = buildRelPlanWithQueryBlockAlias(rootAfterHintPropagation);
-        util.assertEqualsOrExpand("afterPropagatingHints", plan, true);
-
-        RelNode rootAfterClearingJoinHintWithInvalidPropagation =
-                rootAfterHintPropagation.accept(new ClearJoinHintWithInvalidPropagationShuttle());
-        plan = buildRelPlanWithQueryBlockAlias(rootAfterClearingJoinHintWithInvalidPropagation);
-        util.assertEqualsOrExpand("afterClearingJoinHints", plan, false);
-    }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTestBase.java
new file mode 100644
index 00000000000..105f4e42a38
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearJoinHintWithInvalidPropagationShuttleTestBase.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.alias;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.utils.PlannerMocks;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.junit.Before;
+
+import java.util.Collections;
+
+/** A base class for testing clearing join hint with invalid propagation. */
+public abstract class ClearJoinHintWithInvalidPropagationShuttleTestBase extends TableTestBase {
+
+    protected final TableTestUtil util = getTableTestUtil();
+
+    abstract TableTestUtil getTableTestUtil();
+
+    abstract boolean isBatchMode();
+
+    private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default");
+    private final CatalogManager catalogManager =
+            CatalogManagerMocks.preparedCatalogManager()
+                    .defaultCatalog("builtin", catalog)
+                    .config(
+                            Configuration.fromMap(
+                                    Collections.singletonMap(
+                                            ExecutionOptions.RUNTIME_MODE.key(),
+                                            isBatchMode()
+                                                    ? RuntimeExecutionMode.BATCH.name()
+                                                    : RuntimeExecutionMode.STREAMING.name())))
+                    .build();
+
+    private final PlannerMocks plannerMocks =
+            PlannerMocks.newBuilder()
+                    .withBatchMode(isBatchMode())
+                    .withCatalogManager(catalogManager)
+                    .build();
+    protected final FlinkRelBuilder builder = plannerMocks.getPlannerContext().createRelBuilder();
+
+    @Before
+    public void before() throws Exception {
+        util.tableEnv().registerCatalog("testCatalog", catalog);
+        util.tableEnv().executeSql("use catalog testCatalog");
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE t1 (\n"
+                                + "  a BIGINT\n"
+                                + ") WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'bounded' = '"
+                                + isBatchMode()
+                                + "'\n"
+                                + ")");
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE t2 (\n"
+                                + "  a BIGINT\n"
+                                + ") WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'bounded' = '"
+                                + isBatchMode()
+                                + "'\n"
+                                + ")");
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE t3 (\n"
+                                + "  a BIGINT\n"
+                                + ") WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'bounded' = '"
+                                + isBatchMode()
+                                + "'\n"
+                                + ")");
+    }
+
+    protected String buildRelPlanWithQueryBlockAlias(RelNode node) {
+        return System.lineSeparator()
+                + FlinkRelOptUtil.toString(
+                        node, SqlExplainLevel.EXPPLAN_ATTRIBUTES, false, false, true, false, true);
+    }
+
+    protected void verifyRelPlan(RelNode node) {
+        String plan = buildRelPlanWithQueryBlockAlias(node);
+        util.assertEqualsOrExpand("beforePropagatingHints", plan, true);
+
+        RelNode rootAfterHintPropagation = RelOptUtil.propagateRelHints(node, false);
+        plan = buildRelPlanWithQueryBlockAlias(rootAfterHintPropagation);
+        util.assertEqualsOrExpand("afterPropagatingHints", plan, true);
+
+        RelNode rootAfterClearingJoinHintWithInvalidPropagation =
+                rootAfterHintPropagation.accept(new ClearJoinHintWithInvalidPropagationShuttle());
+        plan = buildRelPlanWithQueryBlockAlias(rootAfterClearingJoinHintWithInvalidPropagation);
+        util.assertEqualsOrExpand("afterClearingJoinHints", plan, false);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.java
new file mode 100644
index 00000000000..87d39da661c
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.alias;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.planner.hint.FlinkHints;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.LookupJoinHintSpecTest;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.hint.RelHint;
+import org.junit.Before;
+import org.junit.Test;
+
+/** Tests clearing lookup join hint with invalid propagation in stream. */
+public class ClearLookupJoinHintWithInvalidPropagationShuttleTest
+        extends ClearJoinHintWithInvalidPropagationShuttleTestBase {
+    @Override
+    TableTestUtil getTableTestUtil() {
+        return streamTestUtil(TableConfig.getDefault());
+    }
+
+    @Override
+    boolean isBatchMode() {
+        return false;
+    }
+
+    @Before
+    public void before() throws Exception {
+        super.before();
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE src (\n"
+                                + "  a BIGINT\n"
+                                + ") WITH (\n"
+                                + " 'connector' = 'values'\n"
+                                + ")");
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE lookup (\n"
+                                + "  a BIGINT\n"
+                                + ") WITH (\n"
+                                + " 'connector' = 'values'\n"
+                                + ")");
+    }
+
+    @Test
+    public void testNoNeedToClearLookupHint() {
+        // SELECT /*+ LOOKUP('table'='lookup', 'retry-predicate'='lookup_miss',
+        // 'retry-strategy'='fixed_delay', 'fixed-delay'='155 ms', 'max-attempts'='10') ) */ *
+        //  FROM src
+        //  JOIN lookup FOR SYSTEM_TIME AS OF T.proctime AS D
+        //      ON T.a = D.a
+        RelNode root =
+                builder.scan("src")
+                        .scan("lookup")
+                        .snapshot(builder.getRexBuilder().makeCall(FlinkSqlOperatorTable.PROCTIME))
+                        .join(
+                                JoinRelType.INNER,
+                                builder.equals(builder.field(2, 0, "a"), builder.field(2, 1, "a")))
+                        .project(builder.field(1, 0, "a"))
+                        .hints(LookupJoinHintSpecTest.getLookupJoinHint("lookup", false, true))
+                        .build();
+        verifyRelPlan(root);
+    }
+
+    @Test
+    public void testClearLookupHintWithInvalidPropagationToViewWhileViewHasLookupHints() {
+        // SELECT /*+ LOOKUP('table'='lookup', 'retry-predicate'='lookup_miss',
+        // 'retry-strategy'='fixed_delay', 'fixed-delay'='155 ms', 'max-attempts'='10') ) */ *
+        //   FROM (
+        //     SELECT /*+ LOOKUP('table'='lookup', 'async'='true', 'output-mode'='allow_unordered',
+        // 'capacity'='1000', 'time-out'='300 s'
+        //       src.a, src.proctime
+        //     FROM src
+        //       JOIN lookup FOR SYSTEM_TIME AS OF T.proctime AS D
+        //         ON T.a = D.id
+        //     ) t1 JOIN lookup FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.a = t2.a
+        RelNode root =
+                builder.scan("src")
+                        .scan("lookup")
+                        .snapshot(builder.getRexBuilder().makeCall(FlinkSqlOperatorTable.PROCTIME))
+                        .join(
+                                JoinRelType.INNER,
+                                builder.equals(builder.field(2, 0, "a"), builder.field(2, 1, "a")))
+                        .project(builder.field(1, 0, "a"))
+                        .hints(LookupJoinHintSpecTest.getLookupJoinHint("lookup", false, true))
+                        .hints(RelHint.builder(FlinkHints.HINT_ALIAS).hintOption("t1").build())
+                        .scan("src")
+                        .snapshot(builder.getRexBuilder().makeCall(FlinkSqlOperatorTable.PROCTIME))
+                        .join(
+                                JoinRelType.INNER,
+                                builder.equals(builder.field(2, 0, "a"), builder.field(2, 1, "a")))
+                        .project(builder.field(1, 0, "a"))
+                        .hints(LookupJoinHintSpecTest.getLookupJoinHint("lookup", true, false))
+                        .build();
+        verifyRelPlan(root);
+    }
+
+    @Test
+    public void testClearLookupHintWithInvalidPropagationToSubQuery() {
+        // SELECT /*+ LOOKUP('table'='lookup', 'retry-predicate'='lookup_miss',
+        // 'retry-strategy'='fixed_delay', 'fixed-delay'='155 ms', 'max-attempts'='10',
+        // 'async'='true', 'output-mode'='allow_unordered','capacity'='1000', 'time-out'='300 s' */*
+        //   FROM (
+        //     SELECT src.a
+        //     FROM src
+        //     JOIN lookup FOR SYSTEM_TIME AS OF T.proctime AS D
+        //       ON T.a = D.id
+        //   ) t1 JOIN src t2 ON t1.a = t2.a
+        RelNode root =
+                builder.scan("src")
+                        .scan("lookup")
+                        .snapshot(builder.getRexBuilder().makeCall(FlinkSqlOperatorTable.PROCTIME))
+                        .join(
+                                JoinRelType.INNER,
+                                builder.equals(builder.field(2, 0, "a"), builder.field(2, 1, "a")))
+                        .project(builder.field(1, 0, "a"))
+                        .hints(RelHint.builder(FlinkHints.HINT_ALIAS).hintOption("t1").build())
+                        .scan("src")
+                        .hints(RelHint.builder(FlinkHints.HINT_ALIAS).hintOption("t2").build())
+                        .join(
+                                JoinRelType.INNER,
+                                builder.equals(builder.field(2, 0, "a"), builder.field(2, 1, "a")))
+                        .project(builder.field(1, 0, "a"))
+                        .hints(LookupJoinHintSpecTest.getLookupJoinHint("lookup", true, true))
+                        .build();
+        verifyRelPlan(root);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/join/hints/BroadcastJoinHintTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.java
similarity index 94%
rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/join/hints/BroadcastJoinHintTest.java
rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.java
index 30d7e290fca..1b93904339a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/join/hints/BroadcastJoinHintTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.batch.sql.join.hints;
+package org.apache.flink.table.planner.plan.hints.batch;
 
 import org.apache.flink.table.planner.hint.JoinStrategy;
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/join/hints/JoinHintTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
similarity index 99%
rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/join/hints/JoinHintTestBase.java
rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
index c3f7473d6ca..25bc946c70d 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/join/hints/JoinHintTestBase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.batch.sql.join.hints;
+package org.apache.flink.table.planner.plan.hints.batch;
 
 import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.api.SqlParserException;
@@ -57,6 +57,8 @@ public abstract class JoinHintTestBase extends TableTestBase {
 
     private final List<String> allJoinHintNames =
             Lists.newArrayList(JoinStrategy.values()).stream()
+                    // LOOKUP hint has different kv-options against other join hints
+                    .filter(hint -> hint != JoinStrategy.LOOKUP)
                     .map(JoinStrategy::getJoinHintName)
                     .collect(Collectors.toList());
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/join/hints/NestLoopJoinHintTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.java
similarity index 94%
rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/join/hints/NestLoopJoinHintTest.java
rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.java
index 40dcb9ba0e3..1daf888052e 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/join/hints/NestLoopJoinHintTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.batch.sql.join.hints;
+package org.apache.flink.table.planner.plan.hints.batch;
 
 import org.apache.flink.table.planner.hint.JoinStrategy;
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/join/hints/ShuffleHashJoinHintTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.java
similarity index 94%
rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/join/hints/ShuffleHashJoinHintTest.java
rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.java
index 23bbb51d5d7..cc9f2be89e2 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/join/hints/ShuffleHashJoinHintTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.batch.sql.join.hints;
+package org.apache.flink.table.planner.plan.hints.batch;
 
 import org.apache.flink.table.planner.hint.JoinStrategy;
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/join/hints/ShuffleMergeJoinHintTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.java
similarity index 94%
rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/join/hints/ShuffleMergeJoinHintTest.java
rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.java
index 41a126ddeed..deaa6c97b88 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/join/hints/ShuffleMergeJoinHintTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.batch.sql.join.hints;
+package org.apache.flink.table.planner.plan.hints.batch;
 
 import org.apache.flink.table.planner.hint.JoinStrategy;
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LookupJoinHintSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LookupJoinHintSpecSerdeTest.java
new file mode 100644
index 00000000000..bb5b820f338
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LookupJoinHintSpecSerdeTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.planner.plan.nodes.exec.spec.LookupJoinHintSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.LookupJoinHintSpecTest;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeTestUtil.testJsonRoundTrip;
+
+/** Serde tests for {@link LookupJoinHintSpec}. */
+public class LookupJoinHintSpecSerdeTest {
+
+    @Test
+    void testJoinSpecSerde() throws IOException {
+        LookupJoinHintSpec lookupJoinHintSpec =
+                LookupJoinHintSpec.fromJoinHint(LookupJoinHintSpecTest.completeLookupHint);
+        testJsonRoundTrip(lookupJoinHintSpec, LookupJoinHintSpec.class);
+
+        lookupJoinHintSpec =
+                LookupJoinHintSpec.fromJoinHint(LookupJoinHintSpecTest.lookupHintWithAsync);
+        testJsonRoundTrip(lookupJoinHintSpec, LookupJoinHintSpec.class);
+
+        lookupJoinHintSpec =
+                LookupJoinHintSpec.fromJoinHint(LookupJoinHintSpecTest.lookupHintWithRetry);
+        testJsonRoundTrip(lookupJoinHintSpec, LookupJoinHintSpec.class);
+
+        lookupJoinHintSpec =
+                LookupJoinHintSpec.fromJoinHint(LookupJoinHintSpecTest.lookupHintWithTableOnly);
+        testJsonRoundTrip(lookupJoinHintSpec, LookupJoinHintSpec.class);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/spec/LookupJoinHintSpecTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/spec/LookupJoinHintSpecTest.java
new file mode 100644
index 00000000000..0aa0aae4013
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/spec/LookupJoinHintSpecTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.spec;
+
+import org.apache.flink.table.planner.hint.JoinStrategy;
+import org.apache.flink.table.planner.hint.LookupJoinHintOptions;
+import org.apache.flink.table.runtime.operators.join.lookup.ResultRetryStrategy;
+
+import org.apache.calcite.rel.hint.RelHint;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link LookupJoinHintSpec}. */
+public class LookupJoinHintSpecTest {
+    public static RelHint completeLookupHint = getLookupJoinHint(true, true);
+    public static RelHint lookupHintWithAsync = getLookupJoinHint(true, false);
+    public static RelHint lookupHintWithRetry = getLookupJoinHint(false, true);
+    public static RelHint lookupHintWithTableOnly = getLookupJoinHint(false, false);
+
+    public static RelHint getLookupJoinHint(String table, boolean withAsync, boolean withRetry) {
+        Map<String, String> kvOptions = new HashMap<>();
+        kvOptions.put(LookupJoinHintOptions.LOOKUP_TABLE.key(), table);
+        if (withAsync) {
+            addAsyncOptions(kvOptions);
+        }
+        if (withRetry) {
+            addRetryOptions(kvOptions);
+        }
+        return RelHint.builder(JoinStrategy.LOOKUP.getJoinHintName())
+                .hintOptions(kvOptions)
+                .build();
+    }
+
+    public static RelHint getLookupJoinHint(boolean withAsync, boolean withRetry) {
+        return getLookupJoinHint("TestTable", withAsync, withRetry);
+    }
+
+    private static void addAsyncOptions(Map<String, String> kvOptions) {
+        kvOptions.put(LookupJoinHintOptions.ASYNC_LOOKUP.key(), "true");
+        kvOptions.put(LookupJoinHintOptions.ASYNC_CAPACITY.key(), "1000");
+        kvOptions.put(LookupJoinHintOptions.ASYNC_OUTPUT_MODE.key(), "allow_unordered");
+        kvOptions.put(LookupJoinHintOptions.ASYNC_TIMEOUT.key(), "300 s");
+    }
+
+    private static void addRetryOptions(Map<String, String> kvOptions) {
+        kvOptions.put(LookupJoinHintOptions.RETRY_PREDICATE.key(), "lookup_miss");
+        kvOptions.put(LookupJoinHintOptions.RETRY_STRATEGY.key(), "fixed_delay");
+        kvOptions.put(LookupJoinHintOptions.FIXED_DELAY.key(), "155 ms");
+        kvOptions.put(LookupJoinHintOptions.MAX_ATTEMPTS.key(), "10");
+    }
+
+    @Test
+    void testJoinHintToRetryStrategy() {
+        LookupJoinHintSpec lookupJoinHintSpec = LookupJoinHintSpec.fromJoinHint(completeLookupHint);
+        assertTrue(lookupJoinHintSpec.toRetryStrategy() != ResultRetryStrategy.NO_RETRY_STRATEGY);
+    }
+
+    @Test
+    void testJoinHintWithTableOnlyToRetryStrategy() {
+        LookupJoinHintSpec lookupJoinHintSpec =
+                LookupJoinHintSpec.fromJoinHint(lookupHintWithTableOnly);
+        assertTrue(lookupJoinHintSpec.toRetryStrategy() == ResultRetryStrategy.NO_RETRY_STRATEGY);
+    }
+
+    @Test
+    void testJoinHintWithAsyncToRetryStrategy() {
+        LookupJoinHintSpec lookupJoinHintSpec =
+                LookupJoinHintSpec.fromJoinHint(lookupHintWithAsync);
+        assertTrue(lookupJoinHintSpec.toRetryStrategy() == ResultRetryStrategy.NO_RETRY_STRATEGY);
+    }
+
+    @Test
+    void testJoinHintWithRetryToRetryStrategy() {
+        LookupJoinHintSpec lookupJoinHintSpec =
+                LookupJoinHintSpec.fromJoinHint(lookupHintWithRetry);
+        assertTrue(lookupJoinHintSpec.toRetryStrategy() != ResultRetryStrategy.NO_RETRY_STRATEGY);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java
index c0ec231d891..05b9200280a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java
@@ -76,11 +76,24 @@ public class LookupJoinJsonPlanTest extends TableTestBase {
                         + "  age int"
                         + ") with (\n"
                         + "  'connector' = 'values',\n"
-                        + "  'sink-insert-only' = 'false'\n"
-                        + ")";
-        tEnv.executeSql(sinkTable1);
+                        + "  'sink-insert-only' = 'false')";
+        String sinkTable2 =
+                "CREATE TABLE MySink1 (\n"
+                        + "  a int,\n"
+                        + "  b varchar,"
+                        + "  c bigint,"
+                        + "  proctime timestamp(3),"
+                        + "  rowtime timestamp(3),"
+                        + "  id int,"
+                        + "  name varchar,"
+                        + "  age int"
+                        + ") with (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'table-sink-class' = 'DEFAULT')";
         tEnv.executeSql(srcTableA);
         tEnv.executeSql(srcTableB);
+        tEnv.executeSql(sinkTable1);
+        tEnv.executeSql(sinkTable2);
     }
 
     @Test
@@ -182,4 +195,53 @@ public class LookupJoinJsonPlanTest extends TableTestBase {
                         + "LEFT JOIN LookupTable "
                         + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
     }
+
+    @Test
+    public void testJoinTemporalTableWithAsyncHint() {
+        // LookupTable has sync func only, just verify the hint has take effect
+        util.verifyJsonPlan(
+                "INSERT INTO MySink1 SELECT "
+                        + "/*+ LOOKUP('table'='LookupTable', 'async'='true', 'output-mode'='allow_unordered') */ * "
+                        + "FROM MyTable AS T JOIN LookupTable "
+                        + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
+    }
+
+    @Test
+    public void testJoinTemporalTableWithAsyncHint2() {
+        // LookupTable has sync func only, just verify the hint has take effect
+        util.verifyJsonPlan(
+                "INSERT INTO MySink1 SELECT "
+                        + "/*+ LOOKUP('table'='LookupTable', 'async'='true', 'timeout'='600s', 'capacity'='1000') */ * "
+                        + "FROM MyTable AS T JOIN LookupTable "
+                        + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
+    }
+
+    @Test
+    public void testJoinTemporalTableWithRetryHint() {
+        util.verifyJsonPlan(
+                "INSERT INTO MySink1 SELECT "
+                        + "/*+ LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * "
+                        + "FROM MyTable AS T JOIN LookupTable "
+                        + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
+    }
+
+    @Test
+    public void testJoinTemporalTableWithAsyncRetryHint() {
+        // LookupTable has sync func only, just verify the hint has take effect
+        util.verifyJsonPlan(
+                "INSERT INTO MySink1 SELECT "
+                        + "/*+ LOOKUP('table'='LookupTable', 'async'='true', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * "
+                        + "FROM MyTable AS T JOIN LookupTable "
+                        + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
+    }
+
+    @Test
+    public void testJoinTemporalTableWithAsyncRetryHint2() {
+        // LookupTable has sync func only, just verify the hint has take effect
+        util.verifyJsonPlan(
+                "INSERT INTO MySink1 SELECT "
+                        + "/*+ LOOKUP('table'='LookupTable', 'async'='true', 'timeout'='600s', 'capacity'='1000', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * "
+                        + "FROM MyTable AS T JOIN LookupTable "
+                        + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
+    }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.java
index 4cbbd62106c..3fd2b8131e1 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.internal.StatementSetImpl;
 import org.apache.flink.table.planner.hint.JoinStrategy;
-import org.apache.flink.table.planner.plan.batch.sql.join.hints.JoinHintTestBase;
+import org.apache.flink.table.planner.plan.hints.batch.JoinHintTestBase;
 import org.apache.flink.table.planner.utils.TableTestBase;
 import org.apache.flink.table.planner.utils.TableTestUtil;
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.java
index fcc6f87927d..2789b57d4ff 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.internal.StatementSetImpl;
 import org.apache.flink.table.planner.hint.JoinStrategy;
-import org.apache.flink.table.planner.plan.batch.sql.join.hints.JoinHintTestBase;
+import org.apache.flink.table.planner.plan.hints.batch.JoinHintTestBase;
 import org.apache.flink.table.planner.utils.TableTestBase;
 import org.apache.flink.table.planner.utils.TableTestUtil;
 
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.xml
new file mode 100644
index 00000000000..f6823b468d6
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/alias/ClearLookupJoinHintWithInvalidPropagationShuttleTest.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testNoNeedToClearLookupHint">
+    <Resource name="beforePropagatingHints">
+      <![CDATA[
+LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+   :- LogicalTableScan(table=[[builtin, default, src]]), rowType=[RecordType(BIGINT a)]
+   +- LogicalSnapshot(period=[PROCTIME()]), rowType=[RecordType(BIGINT a)]
+      +- LogicalTableScan(table=[[builtin, default, lookup]]), rowType=[RecordType(BIGINT a)]
+]]>
+    </Resource>
+    <Resource name="afterPropagatingHints">
+      <![CDATA[
+LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=10, fixed-delay=155 ms, retry-predicate=lookup_miss, table=lookup}]]]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+   :- LogicalTableScan(table=[[builtin, default, src]]), rowType=[RecordType(BIGINT a)]
+   +- LogicalSnapshot(period=[PROCTIME()]), rowType=[RecordType(BIGINT a)]
+      +- LogicalTableScan(table=[[builtin, default, lookup]]), rowType=[RecordType(BIGINT a)]
+]]>
+    </Resource>
+    <Resource name="afterClearingJoinHints">
+      <![CDATA[
+LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=10, fixed-delay=155 ms, retry-predicate=lookup_miss, table=lookup}]]]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+   :- LogicalTableScan(table=[[builtin, default, src]]), rowType=[RecordType(BIGINT a)]
+   +- LogicalSnapshot(period=[PROCTIME()]), rowType=[RecordType(BIGINT a)]
+      +- LogicalTableScan(table=[[builtin, default, lookup]]), rowType=[RecordType(BIGINT a)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testClearLookupHintWithInvalidPropagationToSubQuery">
+    <Resource name="beforePropagatingHints">
+      <![CDATA[
+LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+   :- LogicalProject(a=[$0], hints=[[[ALIAS options:[t1]]]]), rowType=[RecordType(BIGINT a)]
+   :  +- LogicalJoin(condition=[=($0, $1)], joinType=[inner]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+   :     :- LogicalTableScan(table=[[builtin, default, src]]), rowType=[RecordType(BIGINT a)]
+   :     +- LogicalSnapshot(period=[PROCTIME()]), rowType=[RecordType(BIGINT a)]
+   :        +- LogicalTableScan(table=[[builtin, default, lookup]]), rowType=[RecordType(BIGINT a)]
+   +- LogicalTableScan(table=[[builtin, default, src]], hints=[[[ALIAS inheritPath:[] options:[t2]]]]), rowType=[RecordType(BIGINT a)]
+]]>
+    </Resource>
+    <Resource name="afterPropagatingHints">
+      <![CDATA[
+LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, retry-strategy=fixed_delay, max-attempts=10, output-mode=allow_unordered, fixed-delay=155 ms, retry-predicate=lookup_miss, table=lookup, timeout=300 s, capacity=1000}]]]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+   :- LogicalProject(a=[$0], hints=[[[ALIAS options:[t1]]]]), rowType=[RecordType(BIGINT a)]
+   :  +- LogicalJoin(condition=[=($0, $1)], joinType=[inner], joinHints=[[[LOOKUP inheritPath:[0, 0, 0] options:{async=true, retry-strategy=fixed_delay, max-attempts=10, output-mode=allow_unordered, fixed-delay=155 ms, retry-predicate=lookup_miss, table=lookup, timeout=300 s, capacity=1000}]]], hints=[[[ALIAS inheritPath:[0] options:[t1]]]]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+   :     :- LogicalTableScan(table=[[builtin, default, src]]), rowType=[RecordType(BIGINT a)]
+   :     +- LogicalSnapshot(period=[PROCTIME()]), rowType=[RecordType(BIGINT a)]
+   :        +- LogicalTableScan(table=[[builtin, default, lookup]]), rowType=[RecordType(BIGINT a)]
+   +- LogicalTableScan(table=[[builtin, default, src]], hints=[[[ALIAS inheritPath:[] options:[t2]]]]), rowType=[RecordType(BIGINT a)]
+]]>
+    </Resource>
+    <Resource name="afterClearingJoinHints">
+      <![CDATA[
+LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, retry-strategy=fixed_delay, max-attempts=10, output-mode=allow_unordered, fixed-delay=155 ms, retry-predicate=lookup_miss, table=lookup, timeout=300 s, capacity=1000}]]]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+   :- LogicalProject(a=[$0], hints=[[[ALIAS options:[t1]]]]), rowType=[RecordType(BIGINT a)]
+   :  +- LogicalJoin(condition=[=($0, $1)], joinType=[inner], hints=[[[ALIAS inheritPath:[0] options:[t1]]]]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+   :     :- LogicalTableScan(table=[[builtin, default, src]]), rowType=[RecordType(BIGINT a)]
+   :     +- LogicalSnapshot(period=[PROCTIME()]), rowType=[RecordType(BIGINT a)]
+   :        +- LogicalTableScan(table=[[builtin, default, lookup]]), rowType=[RecordType(BIGINT a)]
+   +- LogicalTableScan(table=[[builtin, default, src]], hints=[[[ALIAS inheritPath:[] options:[t2]]]]), rowType=[RecordType(BIGINT a)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testClearLookupHintWithInvalidPropagationToViewWhileViewHasLookupHints">
+    <Resource name="beforePropagatingHints">
+      <![CDATA[
+LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+   :- LogicalProject(a=[$0], hints=[[[ALIAS options:[t1]]]]), rowType=[RecordType(BIGINT a)]
+   :  +- LogicalJoin(condition=[=($0, $1)], joinType=[inner]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+   :     :- LogicalTableScan(table=[[builtin, default, src]]), rowType=[RecordType(BIGINT a)]
+   :     +- LogicalSnapshot(period=[PROCTIME()]), rowType=[RecordType(BIGINT a)]
+   :        +- LogicalTableScan(table=[[builtin, default, lookup]]), rowType=[RecordType(BIGINT a)]
+   +- LogicalSnapshot(period=[PROCTIME()]), rowType=[RecordType(BIGINT a)]
+      +- LogicalTableScan(table=[[builtin, default, src]]), rowType=[RecordType(BIGINT a)]
+]]>
+    </Resource>
+    <Resource name="afterPropagatingHints">
+      <![CDATA[
+LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, output-mode=allow_unordered, table=lookup, timeout=300 s, capacity=1000}]]]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+   :- LogicalProject(a=[$0], hints=[[[ALIAS options:[t1]]]]), rowType=[RecordType(BIGINT a)]
+   :  +- LogicalJoin(condition=[=($0, $1)], joinType=[inner], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=10, fixed-delay=155 ms, retry-predicate=lookup_miss, table=lookup}][LOOKUP inheritPath:[0, 0, 0] options:{async=true, output-mode=allow_unordered, table=lookup, timeout=300 s, capacity=1000}]]], hints=[[[ALIAS inheritPath:[0] options:[t1]]]]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+   :     :- LogicalTableScan(table=[[builtin, default, src]]), rowType=[RecordType(BIGINT a)]
+   :     +- LogicalSnapshot(period=[PROCTIME()]), rowType=[RecordType(BIGINT a)]
+   :        +- LogicalTableScan(table=[[builtin, default, lookup]]), rowType=[RecordType(BIGINT a)]
+   +- LogicalSnapshot(period=[PROCTIME()]), rowType=[RecordType(BIGINT a)]
+      +- LogicalTableScan(table=[[builtin, default, src]]), rowType=[RecordType(BIGINT a)]
+]]>
+    </Resource>
+    <Resource name="afterClearingJoinHints">
+      <![CDATA[
+LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, output-mode=allow_unordered, table=lookup, timeout=300 s, capacity=1000}]]]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+   :- LogicalProject(a=[$0], hints=[[[ALIAS options:[t1]]]]), rowType=[RecordType(BIGINT a)]
+   :  +- LogicalJoin(condition=[=($0, $1)], joinType=[inner], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=10, fixed-delay=155 ms, retry-predicate=lookup_miss, table=lookup}]]], hints=[[[ALIAS inheritPath:[0] options:[t1]]]]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+   :     :- LogicalTableScan(table=[[builtin, default, src]]), rowType=[RecordType(BIGINT a)]
+   :     +- LogicalSnapshot(period=[PROCTIME()]), rowType=[RecordType(BIGINT a)]
+   :        +- LogicalTableScan(table=[[builtin, default, lookup]]), rowType=[RecordType(BIGINT a)]
+   +- LogicalSnapshot(period=[PROCTIME()]), rowType=[RecordType(BIGINT a)]
+      +- LogicalTableScan(table=[[builtin, default, src]]), rowType=[RecordType(BIGINT a)]
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/hints/BroadcastJoinHintTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
similarity index 100%
rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/hints/BroadcastJoinHintTest.xml
rename to flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/hints/NestLoopJoinHintTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
similarity index 100%
rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/hints/NestLoopJoinHintTest.xml
rename to flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/hints/ShuffleHashJoinHintTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
similarity index 100%
rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/hints/ShuffleHashJoinHintTest.xml
rename to flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/hints/ShuffleMergeJoinHintTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
similarity index 100%
rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/hints/ShuffleMergeJoinHintTest.xml
rename to flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
index 719b432fbce..ecd654ad000 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
@@ -473,4 +473,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncHint.out
similarity index 98%
copy from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
copy to flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncHint.out
index 719b432fbce..5fb580acb2f 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncHint.out
@@ -304,6 +304,12 @@
       } ]
     },
     "lookupKeyContainsPrimaryKey" : false,
+    "joinHint" : {
+      "async" : true,
+      "output-mode" : "ALLOW_UNORDERED",
+      "capacity" : 100,
+      "timeout" : 180000
+    },
     "description" : "LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])"
   }, {
     "id" : 5,
@@ -388,7 +394,7 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`MySink1`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
@@ -435,7 +441,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT, `proctime` TIMESTAMP(3), `rowtime` TIMESTAMP(3), `id` INT, `name` VARCHAR(2147483647), `age` INT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c, proctime, rowtime, id, name, age])"
+    "description" : "Sink(table=[default_catalog.default_database.MySink1], fields=[a, b, c, proctime, rowtime, id, name, age])"
   } ],
   "edges" : [ {
     "source" : 1,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncHint2.out
similarity index 98%
copy from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
copy to flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncHint2.out
index 719b432fbce..82c38b72f98 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncHint2.out
@@ -304,6 +304,12 @@
       } ]
     },
     "lookupKeyContainsPrimaryKey" : false,
+    "joinHint" : {
+      "async" : true,
+      "output-mode" : "ORDERED",
+      "capacity" : 1000,
+      "timeout" : 600000
+    },
     "description" : "LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])"
   }, {
     "id" : 5,
@@ -388,7 +394,7 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`MySink1`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
@@ -435,7 +441,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT, `proctime` TIMESTAMP(3), `rowtime` TIMESTAMP(3), `id` INT, `name` VARCHAR(2147483647), `age` INT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c, proctime, rowtime, id, name, age])"
+    "description" : "Sink(table=[default_catalog.default_database.MySink1], fields=[a, b, c, proctime, rowtime, id, name, age])"
   } ],
   "edges" : [ {
     "source" : 1,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncRetryHint.out
similarity index 97%
copy from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
copy to flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncRetryHint.out
index 719b432fbce..a8b0f6a4131 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncRetryHint.out
@@ -304,6 +304,16 @@
       } ]
     },
     "lookupKeyContainsPrimaryKey" : false,
+    "joinHint" : {
+      "async" : true,
+      "output-mode" : "ORDERED",
+      "capacity" : 100,
+      "timeout" : 180000,
+      "retry-predicate" : "lookup_miss",
+      "retry-strategy" : "FIXED_DELAY",
+      "fixed-delay" : 10000,
+      "max-attempts" : 3
+    },
     "description" : "LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])"
   }, {
     "id" : 5,
@@ -388,7 +398,7 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`MySink1`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
@@ -435,7 +445,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT, `proctime` TIMESTAMP(3), `rowtime` TIMESTAMP(3), `id` INT, `name` VARCHAR(2147483647), `age` INT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c, proctime, rowtime, id, name, age])"
+    "description" : "Sink(table=[default_catalog.default_database.MySink1], fields=[a, b, c, proctime, rowtime, id, name, age])"
   } ],
   "edges" : [ {
     "source" : 1,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncRetryHint2.out
similarity index 97%
copy from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
copy to flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncRetryHint2.out
index 719b432fbce..936696bc74e 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithAsyncRetryHint2.out
@@ -304,6 +304,16 @@
       } ]
     },
     "lookupKeyContainsPrimaryKey" : false,
+    "joinHint" : {
+      "async" : true,
+      "output-mode" : "ORDERED",
+      "capacity" : 1000,
+      "timeout" : 600000,
+      "retry-predicate" : "lookup_miss",
+      "retry-strategy" : "FIXED_DELAY",
+      "fixed-delay" : 10000,
+      "max-attempts" : 3
+    },
     "description" : "LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])"
   }, {
     "id" : 5,
@@ -388,7 +398,7 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`MySink1`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
@@ -435,7 +445,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT, `proctime` TIMESTAMP(3), `rowtime` TIMESTAMP(3), `id` INT, `name` VARCHAR(2147483647), `age` INT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c, proctime, rowtime, id, name, age])"
+    "description" : "Sink(table=[default_catalog.default_database.MySink1], fields=[a, b, c, proctime, rowtime, id, name, age])"
   } ],
   "edges" : [ {
     "source" : 1,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
index 0b40be07085..b4f2cb7d22d 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out
@@ -457,4 +457,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithRetryHint.out
similarity index 97%
copy from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
copy to flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithRetryHint.out
index 719b432fbce..c8dc6c5ba8a 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithRetryHint.out
@@ -304,6 +304,15 @@
       } ]
     },
     "lookupKeyContainsPrimaryKey" : false,
+    "joinHint" : {
+      "output-mode" : "ORDERED",
+      "capacity" : 100,
+      "timeout" : 180000,
+      "retry-predicate" : "lookup_miss",
+      "retry-strategy" : "FIXED_DELAY",
+      "fixed-delay" : 10000,
+      "max-attempts" : 3
+    },
     "description" : "LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])"
   }, {
     "id" : 5,
@@ -388,7 +397,7 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`MySink1`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
@@ -435,7 +444,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT, `proctime` TIMESTAMP(3), `rowtime` TIMESTAMP(3), `id` INT, `name` VARCHAR(2147483647), `age` INT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c, proctime, rowtime, id, name, age])"
+    "description" : "Sink(table=[default_catalog.default_database.MySink1], fields=[a, b, c, proctime, rowtime, id, name, age])"
   } ],
   "edges" : [ {
     "source" : 1,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
index 7993fa33c99..e1de434979f 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
@@ -129,6 +129,160 @@ GroupAggregate(groupBy=[b], select=[b, COUNT_RETRACT(a) AS EXPR$1, SUM_RETRACT(c
             +- GroupAggregate(groupBy=[a, b], select=[a, b, SUM(c) AS c, SUM(d) AS d])
                +- Exchange(distribution=[hash[a, b]])
                   +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[a, b, c, d])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinAsyncTableWithAsyncHint[LegacyTableSource=false]">
+    <Resource name="sql">
+      <![CDATA[SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'async'='true') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinAsyncTableWithAsyncHint[LegacyTableSource=true]">
+    <Resource name="sql">
+      <![CDATA[SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'async'='true') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinAsyncTableWithSyncHint[LegacyTableSource=false]">
+    <Resource name="sql">
+      <![CDATA[SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'async'='false') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinAsyncTableWithSyncHint[LegacyTableSource=true]">
+    <Resource name="sql">
+      <![CDATA[SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'async'='false') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithTableNameOnly[LegacyTableSource=false]">
+    <Resource name="sql">
+      <![CDATA[SELECT /*+ LOOKUP('table'='LookupTable') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinSyncTableWithAsyncHint[LegacyTableSource=false]">
+    <Resource name="sql">
+      <![CDATA[SELECT /*+ LOOKUP('table'='LookupTable', 'async'='true') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinSyncTableWithAsyncHint[LegacyTableSource=true]">
+    <Resource name="sql">
+      <![CDATA[SELECT /*+ LOOKUP('table'='LookupTable', 'async'='true') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -201,6 +355,53 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], nam
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, CAST(10 AS INTEGER) AS age])
 +- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[age=10, id=a], where=[((age = 10) AND (CAST(name AS BIGINT) > 1000))], select=[a, b, c, proctime, rowtime, id, name])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinTemporalTableWithNestedQuery[LegacyTableSource=true]">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM (SELECT a, b, proctime FROM MyTable WHERE c > 1000) AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], proctime=[$2], id=[$3], name=[$4], age=[$5])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 2}])
+   :- LogicalProject(a=[$0], b=[$1], proctime=[$3])
+   :  +- LogicalFilter(condition=[>($2, 1000)])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, proctime, id, name, age])
+   +- Calc(select=[a, b, proctime], where=[(c > 1000)])
+      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithTableNameOnly[LegacyTableSource=true]">
+    <Resource name="sql">
+      <![CDATA[SELECT /*+ LOOKUP('table'='LookupTable') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -407,31 +608,6 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], nam
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, CAST(11 AS INTEGER) AS age])
 +- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[age=11], where=[(age = 11)], joinCondition=[(b = $f3)], select=[a, b, c, proctime, rowtime, id, name, CONCAT(name, '!') AS $f3])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testJoinTemporalTableWithNestedQuery[LegacyTableSource=true]">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM (SELECT a, b, proctime FROM MyTable WHERE c > 1000) AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a=[$0], b=[$1], proctime=[$2], id=[$3], name=[$4], age=[$5])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 2}])
-   :- LogicalProject(a=[$0], b=[$1], proctime=[$3])
-   :  +- LogicalFilter(condition=[>($2, 1000)])
-   :     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-   +- LogicalFilter(condition=[=($cor0.a, $0)])
-      +- LogicalSnapshot(period=[$cor0.proctime])
-         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable, source: [TestTemporalTable(id, name, age)]]])
-]]>
-    </Resource>
-    <Resource name="optimized exec plan">
-      <![CDATA[
-Calc(select=[a, b, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, proctime, id, name, age])
-   +- Calc(select=[a, b, proctime], where=[(c > 1000)])
-      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 9806ebe10a3..bf83987afa3 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -33,6 +33,7 @@ import org.apache.flink.table.planner.plan.logical.{LogicalWindow, _}
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.calcite._
 import org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalWindowTableFunction
+import org.apache.flink.table.planner.plan.nodes.exec.spec.LookupJoinHintSpec
 import org.apache.flink.table.planner.plan.nodes.logical._
 import org.apache.flink.table.planner.plan.nodes.physical.batch._
 import org.apache.flink.table.planner.plan.nodes.physical.stream._
@@ -2625,7 +2626,8 @@ class FlinkRelMdHandlerTestBase {
       streamScan.getTable,
       None,
       JoinInfo.of(ImmutableIntList.of(0), ImmutableIntList.of(0)),
-      JoinRelType.INNER
+      JoinRelType.INNER,
+      Option.empty[LookupJoinHintSpec]
     )
     (batchLookupJoin, streamLookupJoin)
   }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
index d02b070f7c5..d7ebfcb93e2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
@@ -69,6 +69,7 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri
 
     if (legacyTableSource) {
       TestTemporalTable.createTemporaryTable(util.tableEnv, "LookupTable")
+      TestTemporalTable.createTemporaryTable(util.tableEnv, "AsyncLookupTable", async = true)
     } else {
       util.addTable("""
                       |CREATE TABLE LookupTable (
@@ -79,6 +80,16 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri
                       |  'connector' = 'values'
                       |)
                       |""".stripMargin)
+      util.addTable("""
+                      |CREATE TABLE AsyncLookupTable (
+                      |  `id` INT,
+                      |  `name` STRING,
+                      |  `age` INT
+                      |) WITH (
+                      |  'connector' = 'values',
+                      |  'async' = 'true'
+                      |)
+                      |""".stripMargin)
 
       util.addTable("""
                       |CREATE TABLE LookupTableWithComputedColumn (
@@ -585,6 +596,201 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri
       replaceNodeIdInOperator(replaceStreamNodeId(replaceStageId(actual))))
   }
 
+  @Test
+  def testInvalidJoinHint(): Unit = {
+    // lost required hint option 'table'
+    expectExceptionThrown(
+      """
+        |SELECT /*+ LOOKUP('tableName'='LookupTable') */ *
+        |FROM MyTable AS T
+        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+        | ON T.a = D.id
+        |""".stripMargin,
+      "Invalid LOOKUP hint: incomplete required option(s): [Key: 'table' , default: null (fallback keys: [])]",
+      classOf[AssertionError]
+    )
+
+    // invalid async option value
+    expectExceptionThrown(
+      """
+        |SELECT /*+ LOOKUP('table'='LookupTable', 'async'='yes') */ *
+        |FROM MyTable AS T
+        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+        | ON T.a = D.id
+        |""".stripMargin,
+      "Invalid LOOKUP hint options: Could not parse value 'yes' for key 'async'",
+      classOf[AssertionError]
+    )
+
+    // invalid async output-mode option value
+    expectExceptionThrown(
+      """
+        |SELECT /*+ LOOKUP('table'='LookupTable', 'async'='true', 'output-mode'='allow-unordered') */ *
+        |FROM MyTable AS T
+        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+        | ON T.a = D.id
+        |""".stripMargin,
+      "Invalid LOOKUP hint options: Could not parse value 'allow-unordered' for key 'output-mode'",
+      classOf[AssertionError]
+    )
+
+    // invalid async timeout option value
+    expectExceptionThrown(
+      """
+        |SELECT /*+ LOOKUP('table'='LookupTable', 'async'='true', 'timeout'='300 si') */ *
+        |FROM MyTable AS T
+        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+        | ON T.a = D.id
+        |""".stripMargin,
+      "Invalid LOOKUP hint options: Could not parse value '300 si' for key 'timeout'",
+      classOf[AssertionError]
+    )
+
+    // invalid retry-strategy option value
+    expectExceptionThrown(
+      """
+        |SELECT /*+ LOOKUP('table'='LookupTable', 'retry-strategy'='fixed-delay') */ *
+        |FROM MyTable AS T
+        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+        | ON T.a = D.id
+        |""".stripMargin,
+      "Invalid LOOKUP hint options: Could not parse value 'fixed-delay' for key 'retry-strategy'",
+      classOf[AssertionError]
+    )
+
+    // invalid retry fixed-delay option value
+    expectExceptionThrown(
+      """
+        |SELECT /*+ LOOKUP('table'='LookupTable', 'fixed-delay'='100 nano sec') */ *
+        |FROM MyTable AS T
+        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+        | ON T.a = D.id
+        |""".stripMargin,
+      "Invalid LOOKUP hint options: Could not parse value '100 nano sec' for key 'fixed-delay'",
+      classOf[AssertionError]
+    )
+
+    // invalid retry max-attempts option value
+    expectExceptionThrown(
+      """
+        |SELECT /*+ LOOKUP('table'='LookupTable', 'max-attempts'='100.0') */ *
+        |FROM MyTable AS T
+        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+        | ON T.a = D.id
+        |""".stripMargin,
+      "Invalid LOOKUP hint options: Could not parse value '100.0' for key 'max-attempts'",
+      classOf[AssertionError]
+    )
+
+    // incomplete retry hint options
+    expectExceptionThrown(
+      """
+        |SELECT /*+ LOOKUP('table'='LookupTable', 'max-attempts'='100') */ *
+        |FROM MyTable AS T
+        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+        | ON T.a = D.id
+        |""".stripMargin,
+      "Invalid LOOKUP hint: retry options can be both null or all not null",
+      classOf[AssertionError]
+    )
+
+    // invalid retry option value
+    expectExceptionThrown(
+      """
+        |SELECT /*+ LOOKUP('table'='LookupTable', 'retry-predicate'='exception', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='-3') */ *
+        |FROM MyTable AS T
+        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+        | ON T.a = D.id
+        |""".stripMargin,
+      "Invalid LOOKUP hint option: unsupported retry-predicate 'exception', only 'lookup_miss' is supported currently",
+      classOf[AssertionError]
+    )
+    expectExceptionThrown(
+      """
+        |SELECT /*+ LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='-3') */ *
+        |FROM MyTable AS T
+        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+        | ON T.a = D.id
+        |""".stripMargin,
+      "Invalid LOOKUP hint option: max-attempts value should be positive integer but was -3",
+      classOf[AssertionError]
+    )
+    expectExceptionThrown(
+      """
+        |SELECT /*+ LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='-10s', 'max-attempts'='3') */ *
+        |FROM MyTable AS T
+        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+        | ON T.a = D.id
+        |""".stripMargin,
+      "Invalid LOOKUP hint options: Could not parse value '-10s' for key 'fixed-delay'",
+      classOf[AssertionError]
+    )
+    expectExceptionThrown(
+      """
+        |SELECT /*+ LOOKUP('table'='LookupTable', 'retry-predicate'='lookup-miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ *
+        |FROM MyTable AS T
+        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+        | ON T.a = D.id
+        |""".stripMargin,
+      "Invalid LOOKUP hint option: unsupported retry-predicate 'lookup-miss', only 'lookup_miss' is supported currently",
+      classOf[AssertionError]
+    )
+    expectExceptionThrown(
+      """
+        |SELECT /*+ LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed-delay', 'fixed-delay'='10s', 'max-attempts'='3') */ *
+        |FROM MyTable AS T
+        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+        | ON T.a = D.id
+        |""".stripMargin,
+      "Invalid LOOKUP hint options: Could not parse value 'fixed-delay' for key 'retry-strategy'",
+      classOf[AssertionError]
+    )
+  }
+
+  @Test
+  def testJoinHintWithTableAlias(): Unit = {
+    // TODO to be supported in FLINK-28850 (to make LogicalSnapshot Hintable)
+    thrown.expectMessage(
+      "The options of following hints cannot match the name of input tables or views")
+    thrown.expect(classOf[ValidationException])
+    val sql = "SELECT /*+ LOOKUP('table'='D') */ * FROM MyTable AS T JOIN LookupTable " +
+      "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testJoinHintWithTableNameOnly(): Unit = {
+    val sql = "SELECT /*+ LOOKUP('table'='LookupTable') */ * FROM MyTable AS T JOIN LookupTable " +
+      "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testJoinSyncTableWithAsyncHint(): Unit = {
+    val sql =
+      "SELECT /*+ LOOKUP('table'='LookupTable', 'async'='true') */ * FROM MyTable AS T JOIN LookupTable " +
+        "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testJoinAsyncTableWithAsyncHint(): Unit = {
+    val sql =
+      "SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'async'='true') */ * " +
+        "FROM MyTable AS T JOIN AsyncLookupTable " +
+        "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testJoinAsyncTableWithSyncHint(): Unit = {
+    val sql =
+      "SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'async'='false') */ * " +
+        "FROM MyTable AS T JOIN AsyncLookupTable " +
+        "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"
+    util.verifyExecPlan(sql)
+  }
+
   // ==========================================================================================
 
   private def createLookupTable(tableName: String, lookupFunction: UserDefinedFunction): Unit = {
@@ -636,7 +842,10 @@ object LookupJoinTest {
   }
 }
 
-class TestTemporalTable(bounded: Boolean = false, val keys: Array[String] = Array())
+class TestTemporalTable(
+    bounded: Boolean = false,
+    val keys: Array[String] = Array.empty[String],
+    async: Boolean = false)
   extends LookupableTableSource[RowData]
   with StreamTableSource[RowData] {
 
@@ -657,7 +866,7 @@ class TestTemporalTable(bounded: Boolean = false, val keys: Array[String] = Arra
         "this method should never be called.")
   }
 
-  override def isAsyncEnabled: Boolean = false
+  override def isAsyncEnabled: Boolean = async
 
   override def isBounded: Boolean = this.bounded
 
@@ -690,8 +899,9 @@ object TestTemporalTable {
   def createTemporaryTable(
       tEnv: TableEnvironment,
       tableName: String,
-      isBounded: Boolean = false): Unit = {
-    val source = new TestTemporalTable(isBounded)
+      isBounded: Boolean = false,
+      async: Boolean = false): Unit = {
+    val source = new TestTemporalTable(isBounded, async = async)
     tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(tableName, source)
   }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
index 710454fee88..8826a2ccf46 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
@@ -415,6 +415,8 @@ class AsyncLookupJoinITCase(
     new java.lang.Long(l)
   }
 
+// TODO add case with async and retry in FLINK-28849
+
 }
 
 object AsyncLookupJoinITCase {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
index 752a903ca39..c871ab24c34 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
@@ -708,6 +708,8 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType)
     val expected = Seq("3", "8", "9")
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
+  // TODO add case with retry hint in FLINK-28849
+
 }
 
 object LookupJoinITCase {
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/ResultRetryStrategy.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/ResultRetryStrategy.java
new file mode 100644
index 00000000000..33e8ab564a7
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/ResultRetryStrategy.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.lookup;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/**
+ * A utility class to wrap the data stream api {@link AsyncRetryStrategy} to support both sync and
+ * async retry in table module. The main consideration is making the class name not bind to async
+ * scope, and also highlight the retry predicate is only over the result (not exception).
+ */
+public class ResultRetryStrategy implements AsyncRetryStrategy<RowData> {
+    public static final ResultRetryStrategy NO_RETRY_STRATEGY =
+            new ResultRetryStrategy(AsyncRetryStrategies.NO_RETRY_STRATEGY);
+    private AsyncRetryStrategy retryStrategy;
+
+    private ResultRetryStrategy(AsyncRetryStrategy retryStrategy) {
+        this.retryStrategy = retryStrategy;
+    }
+
+    /** Create a fixed-delay retry strategy by given params. */
+    public static ResultRetryStrategy fixedDelayRetry(
+            int maxAttempts,
+            long backoffTimeMillis,
+            Predicate<Collection<RowData>> resultPredicate) {
+        return new ResultRetryStrategy(
+                new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(
+                                maxAttempts, backoffTimeMillis)
+                        .ifResult(resultPredicate)
+                        .build());
+    }
+
+    @Override
+    public boolean canRetry(int currentAttempts) {
+        return retryStrategy.canRetry(currentAttempts);
+    }
+
+    @Override
+    public long getBackoffTimeMillis(int currentAttempts) {
+        return retryStrategy.getBackoffTimeMillis(currentAttempts);
+    }
+
+    @Override
+    public AsyncRetryPredicate<RowData> getRetryPredicate() {
+        return retryStrategy.getRetryPredicate();
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/RetryableLookupFunctionDelegator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/RetryableLookupFunctionDelegator.java
new file mode 100644
index 00000000000..789aba5bb06
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/RetryableLookupFunctionDelegator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.lookup;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.LookupFunction;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/** A delegator holds user's {@link LookupFunction} to handle retries. */
+public class RetryableLookupFunctionDelegator extends LookupFunction {
+
+    private final LookupFunction userLookupFunction;
+
+    private final ResultRetryStrategy retryStrategy;
+
+    private final boolean retryEnabled;
+
+    private transient Predicate<Collection<RowData>> retryResultPredicate;
+
+    public RetryableLookupFunctionDelegator(
+            @Nonnull LookupFunction userLookupFunction,
+            @Nonnull ResultRetryStrategy retryStrategy) {
+        this.userLookupFunction = userLookupFunction;
+        this.retryStrategy = retryStrategy;
+        this.retryEnabled = retryStrategy.getRetryPredicate().resultPredicate().isPresent();
+    }
+
+    @Override
+    public Collection<RowData> lookup(RowData keyRow) throws IOException {
+        if (!retryEnabled) {
+            return userLookupFunction.lookup(keyRow);
+        }
+        for (int attemptNumber = 1; ; attemptNumber++) {
+            Collection<RowData> result = userLookupFunction.lookup(keyRow);
+            if (retryResultPredicate.test(result) && retryStrategy.canRetry(attemptNumber)) {
+                long backoff = retryStrategy.getBackoffTimeMillis(attemptNumber);
+                try {
+                    Thread.sleep(backoff);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            } else {
+                return result;
+            }
+        }
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        super.open(context);
+        userLookupFunction.open(context);
+        retryResultPredicate =
+                retryStrategy.getRetryPredicate().resultPredicate().orElse(ignore -> false);
+    }
+
+    @Override
+    public void close() throws Exception {
+        userLookupFunction.close();
+        super.close();
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RetryableLookupFunctionDelegatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RetryableLookupFunctionDelegatorTest.java
new file mode 100644
index 00000000000..5ac844be404
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RetryableLookupFunctionDelegatorTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join;
+
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.streaming.util.retryable.RetryPredicates;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.LookupFunction;
+import org.apache.flink.table.runtime.operators.join.lookup.ResultRetryStrategy;
+import org.apache.flink.table.runtime.operators.join.lookup.RetryableLookupFunctionDelegator;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.data.StringData.fromString;
+
+/** Harness tests for {@link RetryableLookupFunctionDelegator}. */
+public class RetryableLookupFunctionDelegatorTest {
+
+    private final LookupFunction userLookupFunc = new TestingLookupFunction();
+
+    private final ResultRetryStrategy retryStrategy =
+            ResultRetryStrategy.fixedDelayRetry(3, 10, RetryPredicates.EMPTY_RESULT_PREDICATE);
+
+    private final RetryableLookupFunctionDelegator delegator =
+            new RetryableLookupFunctionDelegator(userLookupFunc, retryStrategy);
+
+    private static final Map<RowData, Collection<RowData>> data = new HashMap<>();
+
+    static {
+        data.put(
+                GenericRowData.of(1),
+                Collections.singletonList(GenericRowData.of(1, fromString("Julian"))));
+        data.put(
+                GenericRowData.of(3),
+                Arrays.asList(
+                        GenericRowData.of(3, fromString("Jark")),
+                        GenericRowData.of(3, fromString("Jackson"))));
+        data.put(
+                GenericRowData.of(4),
+                Collections.singletonList(GenericRowData.of(4, fromString("Fabian"))));
+    }
+
+    private final RowDataHarnessAssertor assertor =
+            new RowDataHarnessAssertor(
+                    new LogicalType[] {
+                        DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType()
+                    });
+
+    @Test
+    public void testLookupWithRetry() throws Exception {
+        delegator.open(new FunctionContext(new MockStreamingRuntimeContext(false, 1, 1)));
+        for (int i = 1; i <= 5; i++) {
+            RowData key = GenericRowData.of(i);
+            assertor.assertOutputEquals(
+                    "output wrong",
+                    Collections.singleton(data.get(key)),
+                    Collections.singleton(delegator.lookup(key)));
+        }
+        delegator.close();
+    }
+
+    /**
+     * The {@link RetryableLookupFunctionDelegatorTest.TestingLookupFunction} is a {@link
+     * LookupFunction}.
+     */
+    public static final class TestingLookupFunction extends LookupFunction {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Collection<RowData> lookup(RowData keyRow) throws IOException {
+            return data.get(keyRow);
+        }
+    }
+}


[flink] 03/04: [hotfix][runtime] Do last attempt without successfully canceling the retry timer to prevent unexpected incomplete element during finish phase in AsyncWaitOperator

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3a2fc5ef34f563c906473cbe4bdd79a9d7eec48e
Author: lincoln lee <li...@gmail.com>
AuthorDate: Tue Aug 9 17:53:04 2022 +0800

    [hotfix][runtime] Do last attempt without successfully canceling the retry timer to prevent unexpected incomplete element during finish phase in AsyncWaitOperator
    
    It is hard to reproduce this in runtime tests, but occasionally happens in AsyncLookupJoinITCase#testAsyncJoinTemporalTableWithLookupThresholdWithSufficientRetry of FLINK-28849. It's better to add a separate test in runtime.
    
    This closes #20482
---
 .../flink/streaming/api/operators/async/AsyncWaitOperator.java    | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index ba3f1c3ad87..0d88943b21e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -359,10 +359,10 @@ public class AsyncWaitOperator<IN, OUT>
             if (inFlightDelayRetryHandlers.size() > 0) {
                 for (RetryableResultHandlerDelegator delegator : inFlightDelayRetryHandlers) {
                     assert delegator.delayedRetryTimer != null;
-                    // cancel retry timer, cancel failure means retry action already being executed
-                    if (delegator.delayedRetryTimer.cancel(true)) {
-                        tryOnce(delegator);
-                    }
+                    // fire an attempt intermediately not rely on successfully canceling the retry
+                    // timer for two reasons: 1. cancel retry timer can not be 100% safe 2. there's
+                    // protection for repeated retries
+                    tryOnce(delegator);
                 }
                 inFlightDelayRetryHandlers.clear();
             }