You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/12/29 03:14:07 UTC

[flink] branch master updated (94fba321d4e -> be6b1c94ef3)

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 94fba321d4e [FLINK-22318][table] Support RENAME column name for ALTER TABLE statement, optimize the code implementation
     new 62a9d837e24 [FLINK-28850][table-planner] pre-step: copy Snapshot node from calcite to support alias of LOOKUP hint
     new f32ba645246 [FLINK-28850][table-planner] cherry-pick CALCITE-5251 to support hint for Snapshot node
     new be6b1c94ef3 [FLINK-28850][table-planner] Support table alias in LOOKUP hint

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../content.zh/docs/dev/table/sql/queries/hints.md |   2 +-
 docs/content/docs/dev/table/sql/queries/hints.md   |   2 +-
 .../java/org/apache/calcite/rel/core/Snapshot.java | 134 +++++++++++
 .../apache/calcite/rel/hint/HintPredicates.java    |   7 +
 .../calcite/rel/hint/NodeTypeHintPredicate.java    |   6 +-
 .../calcite/rel/logical/LogicalSnapshot.java       |  34 ++-
 .../flink/table/planner/hint/FlinkHints.java       |   1 -
 .../nodes/exec/stream/LookupJoinJsonPlanTest.java  |  10 +-
 .../plan/stream/sql/join/LookupJoinTest.xml        | 260 +++++++++++++++++----
 .../plan/stream/sql/join/LookupJoinTest.scala      |  66 ++++--
 .../runtime/stream/sql/AsyncLookupJoinITCase.scala |   6 +-
 .../runtime/stream/sql/LookupJoinITCase.scala      |   8 +-
 12 files changed, 452 insertions(+), 84 deletions(-)
 create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java


[flink] 02/03: [FLINK-28850][table-planner] cherry-pick CALCITE-5251 to support hint for Snapshot node

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f32ba645246a8180a02182650fb51392facfcc09
Author: lincoln lee <li...@gmail.com>
AuthorDate: Fri Sep 9 16:44:03 2022 +0800

    [FLINK-28850][table-planner] cherry-pick CALCITE-5251 to support hint for Snapshot node
    
    This closes #20800
---
 .../java/org/apache/calcite/rel/core/Snapshot.java | 42 ++++++++++++++++++----
 .../apache/calcite/rel/hint/HintPredicates.java    |  7 ++++
 .../calcite/rel/hint/NodeTypeHintPredicate.java    |  6 +++-
 .../calcite/rel/logical/LogicalSnapshot.java       | 34 ++++++++++++++++--
 4 files changed, 79 insertions(+), 10 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java
index b47aa300d37..531f69345dc 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java
@@ -16,17 +16,19 @@
  */
 package org.apache.calcite.rel.core;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.hint.Hintable;
+import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexShuttle;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.Litmus;
-import org.checkerframework.checker.nullness.qual.Nullable;
 
 import java.util.List;
 import java.util.Objects;
@@ -40,12 +42,17 @@ import java.util.Objects;
  * Snapshot}(TableScan(Products)) is a relational operator that only returns the contents whose
  * versions that overlap with the given specific period (i.e. those that started before given period
  * and ended after it).
+ *
+ * <p>Temporarily copy from calcite to cherry-pick [CALCITE-5251] and will be removed when upgrade
+ * to caclite-1.32.0.
  */
-public abstract class Snapshot extends SingleRel {
+public abstract class Snapshot extends SingleRel implements Hintable {
     // ~ Instance fields --------------------------------------------------------
 
     private final RexNode period;
 
+    protected final ImmutableList<RelHint> hints;
+
     // ~ Constructors -----------------------------------------------------------
 
     /**
@@ -53,16 +60,35 @@ public abstract class Snapshot extends SingleRel {
      *
      * @param cluster Cluster that this relational expression belongs to
      * @param traitSet The traits of this relational expression
+     * @param hints Hints for this node
      * @param input Input relational expression
      * @param period Timestamp expression which as the table was at the given time in the past
      */
-    @SuppressWarnings("method.invocation.invalid")
-    protected Snapshot(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RexNode period) {
+    protected Snapshot(
+            RelOptCluster cluster,
+            RelTraitSet traitSet,
+            List<RelHint> hints,
+            RelNode input,
+            RexNode period) {
         super(cluster, traitSet, input);
         this.period = Objects.requireNonNull(period, "period");
+        this.hints = ImmutableList.copyOf(hints);
         assert isValid(Litmus.THROW, null);
     }
 
+    /**
+     * Creates a Snapshot.
+     *
+     * @param cluster Cluster that this relational expression belongs to
+     * @param traitSet The traits of this relational expression
+     * @param input Input relational expression
+     * @param period Timestamp expression which as the table was at the given time in the past
+     */
+    @SuppressWarnings("method.invocation.invalid")
+    protected Snapshot(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RexNode period) {
+        this(cluster, traitSet, ImmutableList.of(), input, period);
+    }
+
     // ~ Methods ----------------------------------------------------------------
 
     @Override
@@ -72,7 +98,6 @@ public abstract class Snapshot extends SingleRel {
 
     public abstract Snapshot copy(RelTraitSet traitSet, RelNode input, RexNode period);
 
-    @Override
     public RelNode accept(RexShuttle shuttle) {
         RexNode condition = shuttle.apply(this.period);
         if (this.period == condition) {
@@ -91,7 +116,7 @@ public abstract class Snapshot extends SingleRel {
     }
 
     @Override
-    public boolean isValid(Litmus litmus, @Nullable Context context) {
+    public boolean isValid(Litmus litmus, RelNode.Context context) {
         RelDataType dataType = period.getType();
         if (dataType.getSqlTypeName() != SqlTypeName.TIMESTAMP) {
             return litmus.fail(
@@ -101,4 +126,9 @@ public abstract class Snapshot extends SingleRel {
         }
         return litmus.succeed();
     }
+
+    @Override
+    public ImmutableList<RelHint> getHints() {
+        return hints;
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/HintPredicates.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/HintPredicates.java
index a38ad2e141e..7081c68041a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/HintPredicates.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/HintPredicates.java
@@ -109,6 +109,13 @@ public abstract class HintPredicates {
     public static final HintPredicate WINDOW =
             new NodeTypeHintPredicate(NodeTypeHintPredicate.NodeType.WINDOW);
 
+    /**
+     * A hint predicate that indicates a hint can only be used to {@link
+     * org.apache.calcite.rel.core.Snapshot} nodes.
+     */
+    public static final HintPredicate SNAPSHOT =
+            new NodeTypeHintPredicate(NodeTypeHintPredicate.NodeType.SNAPSHOT);
+
     /**
      * Returns a composed hint predicate that represents a short-circuiting logical AND of an array
      * of hint predicates {@code hintPredicates}. When evaluating the composed predicate, if a
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/NodeTypeHintPredicate.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/NodeTypeHintPredicate.java
index 4ed6d117c09..9fce804833e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/NodeTypeHintPredicate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/NodeTypeHintPredicate.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Snapshot;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Values;
@@ -78,7 +79,10 @@ public class NodeTypeHintPredicate implements HintPredicate {
         VALUES(Values.class),
 
         /** The hint would be propagated to the Window nodes. */
-        WINDOW(Window.class);
+        WINDOW(Window.class),
+
+        /** The hint would be propagated to the Snapshot nodes. */
+        SNAPSHOT(Snapshot.class);
 
         /** Relational expression clazz that the hint can apply to. */
         private Class<?> relClazz;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java
index d32e0924dd4..f2e974d3dd3 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java
@@ -18,6 +18,7 @@
 
 package org.apache.calcite.rel.logical;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
@@ -25,6 +26,7 @@ import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.RelDistributionTraitDef;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Snapshot;
+import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.metadata.RelMdCollation;
 import org.apache.calcite.rel.metadata.RelMdDistribution;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
@@ -32,16 +34,37 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.Litmus;
 
+import java.util.List;
+
 /**
  * Sub-class of {@link org.apache.calcite.rel.core.Snapshot} not targeted at any particular engine
  * or calling convention. The class was copied over because of * CALCITE-4554. *
  *
- * <p>Line 80 ~ 91: Calcite only supports timestamp type as period type, but Flink supports both
+ * <p>Line 106 ~ 117: Calcite only supports timestamp type as period type, but Flink supports both
  * Timestamp and TimestampLtz. Should be removed once calcite support TimestampLtz as period type.
  */
 public class LogicalSnapshot extends Snapshot {
 
     // ~ Constructors -----------------------------------------------------------
+    /**
+     * Creates a LogicalSnapshot.
+     *
+     * <p>Use {@link #create} unless you know what you're doing.
+     *
+     * @param cluster Cluster that this relational expression belongs to
+     * @param traitSet The traits of this relational expression
+     * @param hints Hints for this node
+     * @param input Input relational expression
+     * @param period Timestamp expression which as the table was at the given time in the past
+     */
+    public LogicalSnapshot(
+            RelOptCluster cluster,
+            RelTraitSet traitSet,
+            List<RelHint> hints,
+            RelNode input,
+            RexNode period) {
+        super(cluster, traitSet, hints, input, period);
+    }
 
     /**
      * Creates a LogicalSnapshot.
@@ -55,12 +78,12 @@ public class LogicalSnapshot extends Snapshot {
      */
     public LogicalSnapshot(
             RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RexNode period) {
-        super(cluster, traitSet, input, period);
+        super(cluster, traitSet, ImmutableList.of(), input, period);
     }
 
     @Override
     public Snapshot copy(RelTraitSet traitSet, RelNode input, RexNode period) {
-        return new LogicalSnapshot(getCluster(), traitSet, input, period);
+        return new LogicalSnapshot(getCluster(), traitSet, hints, input, period);
     }
 
     /** Creates a LogicalSnapshot. */
@@ -93,4 +116,9 @@ public class LogicalSnapshot extends Snapshot {
         }
         return litmus.succeed();
     }
+
+    @Override
+    public RelNode withHints(final List<RelHint> hintList) {
+        return new LogicalSnapshot(getCluster(), traitSet, hintList, input, getPeriod());
+    }
 }


[flink] 03/03: [FLINK-28850][table-planner] Support table alias in LOOKUP hint

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit be6b1c94ef3f552c753746863cb0a4e7dd86d2fc
Author: lincoln lee <li...@gmail.com>
AuthorDate: Fri Sep 9 17:48:04 2022 +0800

    [FLINK-28850][table-planner] Support table alias in LOOKUP hint
    
    This closes #20800
---
 .../content.zh/docs/dev/table/sql/queries/hints.md |   2 +-
 docs/content/docs/dev/table/sql/queries/hints.md   |   2 +-
 .../flink/table/planner/hint/FlinkHints.java       |   1 -
 .../nodes/exec/stream/LookupJoinJsonPlanTest.java  |  10 +-
 .../plan/stream/sql/join/LookupJoinTest.xml        | 260 +++++++++++++++++----
 .../plan/stream/sql/join/LookupJoinTest.scala      |  66 ++++--
 .../runtime/stream/sql/AsyncLookupJoinITCase.scala |   6 +-
 .../runtime/stream/sql/LookupJoinITCase.scala      |   8 +-
 8 files changed, 275 insertions(+), 80 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/sql/queries/hints.md b/docs/content.zh/docs/dev/table/sql/queries/hints.md
index 9e6ce8b3cce..7cfec52207b 100644
--- a/docs/content.zh/docs/dev/table/sql/queries/hints.md
+++ b/docs/content.zh/docs/dev/table/sql/queries/hints.md
@@ -331,7 +331,7 @@ LOOKUP 联接提示允许用户建议 Flink 优化器:
 
 {{< hint info >}}
 注意:其中 
-- 'table' 是必选项,需要填写目标联接表的表名(和 FROM 子句引用的表名保持一致),注意当前不支持填写表的别名(这将在后续版本中支持)。
+- 'table' 是必选项,需要填写目标联接表的表名(和 FROM 子句引用的表名保持一致),注意如果表定义了别名,则提示选项必须使用别名。
 - 异步查找参数可按需设置一个或多个,未设置的参数按默认值生效。
 - 重试查找参数没有默认值,在需要开启时所有参数都必须设置为有效值。
 {{< /hint >}}
diff --git a/docs/content/docs/dev/table/sql/queries/hints.md b/docs/content/docs/dev/table/sql/queries/hints.md
index 5cc8723112e..33eebcfb75a 100644
--- a/docs/content/docs/dev/table/sql/queries/hints.md
+++ b/docs/content/docs/dev/table/sql/queries/hints.md
@@ -338,7 +338,7 @@ The LOOKUP hint allows users to suggest the Flink optimizer to:
 
 {{< hint info >}}
 Note: 
-- 'table' option is required, only table name is supported(keep consistent with which in the FROM clause), alias name is not supported currently(will be supported in later versions).
+- 'table' option is required, only table name is supported(keep consistent with which in the FROM clause), note that only alias name can be used if table has an alias name.
 - async options are all optional, will use default value if not configured.
 - there is no default value for retry options, all retry options should be set to valid values when need to enable retry.
 {{< /hint >}}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
index 7463ac22893..4e8ce7a7fcb 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
@@ -114,7 +114,6 @@ public abstract class FlinkHints {
     public static boolean canTransposeToTableScan(RelNode node) {
         return node instanceof LogicalProject // computed column on table
                 || node instanceof LogicalFilter
-                // TODO support lookup join hint with alias name in FLINK-28850
                 || node instanceof LogicalSnapshot;
     }
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java
index 05b9200280a..a68eee6357a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java
@@ -201,7 +201,7 @@ public class LookupJoinJsonPlanTest extends TableTestBase {
         // LookupTable has sync func only, just verify the hint has take effect
         util.verifyJsonPlan(
                 "INSERT INTO MySink1 SELECT "
-                        + "/*+ LOOKUP('table'='LookupTable', 'async'='true', 'output-mode'='allow_unordered') */ * "
+                        + "/*+ LOOKUP('table'='D', 'async'='true', 'output-mode'='allow_unordered') */ * "
                         + "FROM MyTable AS T JOIN LookupTable "
                         + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
     }
@@ -211,7 +211,7 @@ public class LookupJoinJsonPlanTest extends TableTestBase {
         // LookupTable has sync func only, just verify the hint has take effect
         util.verifyJsonPlan(
                 "INSERT INTO MySink1 SELECT "
-                        + "/*+ LOOKUP('table'='LookupTable', 'async'='true', 'timeout'='600s', 'capacity'='1000') */ * "
+                        + "/*+ LOOKUP('table'='D', 'async'='true', 'timeout'='600s', 'capacity'='1000') */ * "
                         + "FROM MyTable AS T JOIN LookupTable "
                         + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
     }
@@ -220,7 +220,7 @@ public class LookupJoinJsonPlanTest extends TableTestBase {
     public void testJoinTemporalTableWithRetryHint() {
         util.verifyJsonPlan(
                 "INSERT INTO MySink1 SELECT "
-                        + "/*+ LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * "
+                        + "/*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * "
                         + "FROM MyTable AS T JOIN LookupTable "
                         + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
     }
@@ -230,7 +230,7 @@ public class LookupJoinJsonPlanTest extends TableTestBase {
         // LookupTable has sync func only, just verify the hint has take effect
         util.verifyJsonPlan(
                 "INSERT INTO MySink1 SELECT "
-                        + "/*+ LOOKUP('table'='LookupTable', 'async'='true', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * "
+                        + "/*+ LOOKUP('table'='D', 'async'='true', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * "
                         + "FROM MyTable AS T JOIN LookupTable "
                         + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
     }
@@ -240,7 +240,7 @@ public class LookupJoinJsonPlanTest extends TableTestBase {
         // LookupTable has sync func only, just verify the hint has take effect
         util.verifyJsonPlan(
                 "INSERT INTO MySink1 SELECT "
-                        + "/*+ LOOKUP('table'='LookupTable', 'async'='true', 'timeout'='600s', 'capacity'='1000', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * "
+                        + "/*+ LOOKUP('table'='D', 'async'='true', 'timeout'='600s', 'capacity'='1000', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * "
                         + "FROM MyTable AS T JOIN LookupTable "
                         + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id");
     }
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
index c0626e94636..90ab534634e 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
@@ -536,12 +536,12 @@ GroupAggregate(groupBy=[b], select=[b, COUNT_RETRACT(a) AS EXPR$1, SUM_RETRACT(c
   </TestCase>
   <TestCase name="testJoinAsyncTableWithAsyncHint[LegacyTableSource=false]">
     <Resource name="sql">
-      <![CDATA[SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'async'='true') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+      <![CDATA[SELECT /*+ LOOKUP('table'='D', 'async'='true') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, table=AsyncLookupTable}]]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, table=D}]]])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
    +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
@@ -558,12 +558,12 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n
   </TestCase>
   <TestCase name="testJoinAsyncTableWithAsyncHint[LegacyTableSource=true]">
     <Resource name="sql">
-      <![CDATA[SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'async'='true') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+      <![CDATA[SELECT /*+ LOOKUP('table'='D', 'async'='true') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, table=AsyncLookupTable}]]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, table=D}]]])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
    +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
@@ -580,12 +580,12 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n
   </TestCase>
   <TestCase name="testJoinAsyncTableWithSyncHint[LegacyTableSource=false]">
     <Resource name="sql">
-      <![CDATA[SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'async'='false') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+      <![CDATA[SELECT /*+ LOOKUP('table'='D', 'async'='false') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=false, table=AsyncLookupTable}]]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=false, table=D}]]])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
    +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
@@ -602,12 +602,12 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n
   </TestCase>
   <TestCase name="testJoinAsyncTableWithSyncHint[LegacyTableSource=true]">
     <Resource name="sql">
-      <![CDATA[SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'async'='false') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+      <![CDATA[SELECT /*+ LOOKUP('table'='D', 'async'='false') */ * FROM MyTable AS T JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=false, table=AsyncLookupTable}]]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=false, table=D}]]])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
    +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
@@ -619,12 +619,56 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], nam
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
 +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age], async=[ORDERED, 180000ms, 100])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithTableAlias[LegacyTableSource=false]">
+    <Resource name="sql">
+      <![CDATA[SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=D}]]])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithTableAlias[LegacyTableSource=true]">
+    <Resource name="sql">
+      <![CDATA[SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=D}]]])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
   <TestCase name="testJoinHintWithTableNameOnly[LegacyTableSource=false]">
     <Resource name="sql">
-      <![CDATA[SELECT /*+ LOOKUP('table'='LookupTable') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+      <![CDATA[SELECT /*+ LOOKUP('table'='LookupTable') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime ON T.a = LookupTable.id]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
@@ -633,7 +677,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], nam
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
    +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
-         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]], hints=[[[ALIAS inheritPath:[] options:[LookupTable]]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -646,7 +690,7 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n
   </TestCase>
   <TestCase name="testJoinHintWithTableNameOnly[LegacyTableSource=true]">
     <Resource name="sql">
-      <![CDATA[SELECT /*+ LOOKUP('table'='LookupTable') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+      <![CDATA[SELECT /*+ LOOKUP('table'='LookupTable') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime ON T.a = LookupTable.id]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
@@ -655,7 +699,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], nam
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
    +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
-         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable, source: [TestTemporalTable(id, name, age)]]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable, source: [TestTemporalTable(id, name, age)]]], hints=[[[ALIAS inheritPath:[] options:[LookupTable]]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -668,12 +712,12 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n
   </TestCase>
   <TestCase name="testJoinSyncTableWithAsyncHint[LegacyTableSource=false]">
     <Resource name="sql">
-      <![CDATA[SELECT /*+ LOOKUP('table'='LookupTable', 'async'='true') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+      <![CDATA[SELECT /*+ LOOKUP('table'='D', 'async'='true') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, table=LookupTable}]]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, table=D}]]])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
    +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
@@ -690,12 +734,12 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n
   </TestCase>
   <TestCase name="testJoinSyncTableWithAsyncHint[LegacyTableSource=true]">
     <Resource name="sql">
-      <![CDATA[SELECT /*+ LOOKUP('table'='LookupTable', 'async'='true') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+      <![CDATA[SELECT /*+ LOOKUP('table'='D', 'async'='true') */ * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, table=LookupTable}]]])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{async=true, table=D}]]])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
    +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
@@ -787,7 +831,7 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n
       <![CDATA[== Abstract Syntax Tree ==
 LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
 +- LogicalProject(a=[$0], name=[$6], age=[$7])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, time-out=600s, max-attempts=3, output-mode=allow_unordered, fixed-delay=10s, retry-predicate=lookup_miss, table=AsyncLookupTable, capacity=300}]]])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, time-out=600s, max-attempts=3, output-mode=allow_unordered, fixed-delay=10s, retry-predicate=lookup_miss, table=D, capacity=300}]]])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
       +- LogicalFilter(condition=[=($cor0.a, $0)])
          +- LogicalSnapshot(period=[$cor0.proctime])
@@ -1490,7 +1534,7 @@ Calc(select=[a, b, c, name])
       <![CDATA[== Abstract Syntax Tree ==
 LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
 +- LogicalProject(a=[$0], name=[$6], age=[$7])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, time-out=600s, max-attempts=3, output-mode=allow_unordered, fixed-delay=10s, retry-predicate=lookup_miss, table=AsyncLookupTable, capacity=300}]]])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, time-out=600s, max-attempts=3, output-mode=allow_unordered, fixed-delay=10s, retry-predicate=lookup_miss, table=D, capacity=300}]]])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
       +- LogicalFilter(condition=[=($cor0.a, $0)])
          +- LogicalSnapshot(period=[$cor0.proctime])
@@ -1582,7 +1626,7 @@ Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
       <![CDATA[== Abstract Syntax Tree ==
 LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
 +- LogicalProject(a=[$0], name=[$6], age=[$7])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{time-out=600s, output-mode=allow_unordered, table=AsyncLookupTable, capacity=300}]]])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{time-out=600s, output-mode=allow_unordered, table=D, capacity=300}]]])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
       +- LogicalFilter(condition=[=($cor0.a, $0)])
          +- LogicalSnapshot(period=[$cor0.proctime])
@@ -1699,7 +1743,7 @@ Calc(select=[a, b, PROCTIME_MATERIALIZE(proctime) AS proctime, id, name, age])
       <![CDATA[== Abstract Syntax Tree ==
 LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
 +- LogicalProject(a=[$0], name=[$6], age=[$7])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{time-out=600s, output-mode=allow_unordered, table=AsyncLookupTable, capacity=300}]]])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{time-out=600s, output-mode=allow_unordered, table=D, capacity=300}]]])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
       +- LogicalFilter(condition=[=($cor0.a, $0)])
          +- LogicalSnapshot(period=[$cor0.proctime])
@@ -1791,7 +1835,7 @@ Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
       <![CDATA[== Abstract Syntax Tree ==
 LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
 +- LogicalProject(a=[$0], name=[$6], age=[$7])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=D}]]])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
       +- LogicalFilter(condition=[=($cor0.a, $0)])
          +- LogicalSnapshot(period=[$cor0.proctime])
@@ -1883,7 +1927,7 @@ Sink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
       <![CDATA[== Abstract Syntax Tree ==
 LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name, age])
 +- LogicalProject(a=[$0], name=[$6], age=[$7])
-   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=D}]]])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
       +- LogicalFilter(condition=[=($cor0.a, $0)])
          +- LogicalSnapshot(period=[$cor0.proctime])
@@ -1995,11 +2039,11 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n
   <TestCase name="testMultipleJoinHintsWithSameTableName[LegacyTableSource=true]">
     <Resource name="sql">
       <![CDATA[
-SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'), 
-           LOOKUP('table'='AsyncLookupTable', 'output-mode'='ordered') */ * 
-FROM MyTable AS T 
-JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D 
- ON T.a = D.id
+SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'),
+           LOOKUP('table'='AsyncLookupTable', 'output-mode'='ordered') */ *
+FROM MyTable AS T
+JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime
+ ON T.a = AsyncLookupTable.id
       ]]>
     </Resource>
     <Resource name="ast">
@@ -2009,7 +2053,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], nam
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
    +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
-         +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable, source: [TestTemporalTable(id, name, age)]]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable, source: [TestTemporalTable(id, name, age)]]], hints=[[[ALIAS inheritPath:[] options:[AsyncLookupTable]]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -2042,12 +2086,12 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testMultipleJoinHintsWithDifferentTableName[LegacyTableSource=false]">
+  <TestCase name="testMultipleJoinHintsWithDifferentTableAlias[LegacyTableSource=false]">
     <Resource name="sql">
       <![CDATA[
-SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'), 
-           LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * 
-FROM MyTable AS T 
+SELECT /*+ LOOKUP('table'='D', 'output-mode'='allow_unordered'),
+           LOOKUP('table'='D1', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ *
+FROM MyTable AS T
 JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D 
   ON T.a = D.id
 JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D1 
@@ -2057,8 +2101,8 @@ JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D1
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7], id0=[$8], name0=[$9], age0=[$10])
-+- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]])
-   :- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0, 0] options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP inheritPath:[0, 0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]])
++- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{output-mode=allow_unordered, table=D}][LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=D1}]]])
+   :- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0, 0] options:{output-mode=allow_unordered, table=D}][LOOKUP inheritPath:[0, 0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=D1}]]])
    :  :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
    :  +- LogicalFilter(condition=[=($cor0.a, $0)])
    :     +- LogicalSnapshot(period=[$cor0.proctime])
@@ -2071,6 +2115,41 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], nam
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age, id0, name0, age0])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age, id0, name0, age0], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])
+   +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age], async=[UNORDERED, 180000ms, 100])
+      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultipleJoinHintsWithDifferentTableName[LegacyTableSource=false]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'),
+           LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ *
+FROM MyTable AS T
+JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime
+  ON T.a = AsyncLookupTable.id
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime
+  ON T.a = LookupTable.id
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7], id0=[$8], name0=[$9], age0=[$10])
++- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]])
+   :- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0, 0] options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP inheritPath:[0, 0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   :  +- LogicalFilter(condition=[=($cor0.a, $0)])
+   :     +- LogicalSnapshot(period=[$cor0.proctime])
+   :        +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable]], hints=[[[ALIAS inheritPath:[] options:[AsyncLookupTable]]]])
+   +- LogicalFilter(condition=[=($cor1.a, $0)])
+      +- LogicalSnapshot(period=[$cor1.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]], hints=[[[ALIAS inheritPath:[] options:[LookupTable]]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age, id0, name0, age0])
 +- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age, id0, name0, age0], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])
    +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age], async=[UNORDERED, 180000ms, 100])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -2080,9 +2159,100 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n
   <TestCase name="testMultipleJoinHintsWithDifferentTableName[LegacyTableSource=true]">
     <Resource name="sql">
       <![CDATA[
-SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'), 
-           LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * 
-FROM MyTable AS T 
+SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'),
+           LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ *
+FROM MyTable AS T
+JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime
+  ON T.a = AsyncLookupTable.id
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime
+  ON T.a = LookupTable.id
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7], id0=[$8], name0=[$9], age0=[$10])
++- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]])
+   :- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0, 0] options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP inheritPath:[0, 0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   :  +- LogicalFilter(condition=[=($cor0.a, $0)])
+   :     +- LogicalSnapshot(period=[$cor0.proctime])
+   :        +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable, source: [TestTemporalTable(id, name, age)]]], hints=[[[ALIAS inheritPath:[] options:[AsyncLookupTable]]]])
+   +- LogicalFilter(condition=[=($cor1.a, $0)])
+      +- LogicalSnapshot(period=[$cor1.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, LookupTable, source: [TestTemporalTable(id, name, age)]]], hints=[[[ALIAS inheritPath:[] options:[LookupTable]]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age, id0, name0, age0])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age, id0, name0, age0], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])
+   +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age], async=[UNORDERED, 180000ms, 100])
+      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultipleJoinHintsWithSameTableAlias[LegacyTableSource=false]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT /*+ LOOKUP('table'='D', 'output-mode'='allow_unordered'),
+           LOOKUP('table'='D', 'output-mode'='ordered') */ *
+FROM MyTable AS T
+JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D 
+ ON T.a = D.id
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{output-mode=allow_unordered, table=D}][LOOKUP inheritPath:[0] options:{output-mode=ordered, table=D}]]])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age], async=[UNORDERED, 180000ms, 100])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultipleJoinHintsWithSameTableAlias[LegacyTableSource=true]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT /*+ LOOKUP('table'='D', 'output-mode'='allow_unordered'),
+           LOOKUP('table'='D', 'output-mode'='ordered') */ *
+FROM MyTable AS T
+JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D 
+ ON T.a = D.id
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{output-mode=allow_unordered, table=D}][LOOKUP inheritPath:[0] options:{output-mode=ordered, table=D}]]])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age], async=[UNORDERED, 180000ms, 100])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultipleJoinHintsWithDifferentTableAlias[LegacyTableSource=true]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT /*+ LOOKUP('table'='D', 'output-mode'='allow_unordered'),
+           LOOKUP('table'='D1', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ *
+FROM MyTable AS T
 JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D 
   ON T.a = D.id
 JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D1 
@@ -2092,8 +2262,8 @@ JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D1
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7], id0=[$8], name0=[$9], age0=[$10])
-+- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]])
-   :- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0, 0] options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP inheritPath:[0, 0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]])
++- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] options:{output-mode=allow_unordered, table=D}][LOOKUP inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=D1}]]])
+   :- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0, 0] options:{output-mode=allow_unordered, table=D}][LOOKUP inheritPath:[0, 0] options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s, retry-predicate=lookup_miss, table=D1}]]])
    :  :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
    :  +- LogicalFilter(condition=[=($cor0.a, $0)])
    :     +- LogicalSnapshot(period=[$cor0.proctime])
@@ -2115,11 +2285,11 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n
   <TestCase name="testMultipleJoinHintsWithSameTableName[LegacyTableSource=false]">
     <Resource name="sql">
       <![CDATA[
-SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'), 
-           LOOKUP('table'='AsyncLookupTable', 'output-mode'='ordered') */ * 
-FROM MyTable AS T 
-JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D 
- ON T.a = D.id
+SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'),
+           LOOKUP('table'='AsyncLookupTable', 'output-mode'='ordered') */ *
+FROM MyTable AS T
+JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime
+ ON T.a = AsyncLookupTable.id
       ]]>
     </Resource>
     <Resource name="ast">
@@ -2129,7 +2299,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], nam
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
    +- LogicalFilter(condition=[=($cor0.a, $0)])
       +- LogicalSnapshot(period=[$cor0.proctime])
-         +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, AsyncLookupTable]], hints=[[[ALIAS inheritPath:[] options:[AsyncLookupTable]]]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
index f793bc68e73..ac72487ad98 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
@@ -750,20 +750,16 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri
 
   @Test
   def testJoinHintWithTableAlias(): Unit = {
-    // TODO to be supported in FLINK-28850 (to make LogicalSnapshot Hintable)
-    thrown.expectMessage(
-      "The options of following hints cannot match the name of input tables or" +
-        " views: \n`D` in `LOOKUP`")
-    thrown.expect(classOf[ValidationException])
-    val sql = "SELECT /*+ LOOKUP('table'='D') */ * FROM MyTable AS T JOIN LookupTable " +
-      "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"
+    val sql =
+      "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * FROM MyTable AS T JOIN LookupTable " +
+        "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"
     util.verifyExecPlan(sql)
   }
 
   @Test
   def testJoinHintWithTableNameOnly(): Unit = {
     val sql = "SELECT /*+ LOOKUP('table'='LookupTable') */ * FROM MyTable AS T JOIN LookupTable " +
-      "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"
+      "FOR SYSTEM_TIME AS OF T.proctime ON T.a = LookupTable.id"
     util.verifyExecPlan(sql)
   }
 
@@ -772,9 +768,23 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri
     // only the first hint will take effect
     val sql =
       """
-        |SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'), 
-        |           LOOKUP('table'='AsyncLookupTable', 'output-mode'='ordered') */ * 
-        |FROM MyTable AS T 
+        |SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'),
+        |           LOOKUP('table'='AsyncLookupTable', 'output-mode'='ordered') */ *
+        |FROM MyTable AS T
+        |JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime
+        | ON T.a = AsyncLookupTable.id
+      """.stripMargin
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testMultipleJoinHintsWithSameTableAlias(): Unit = {
+    // only the first hint will take effect
+    val sql =
+      """
+        |SELECT /*+ LOOKUP('table'='D', 'output-mode'='allow_unordered'),
+        |           LOOKUP('table'='D', 'output-mode'='ordered') */ *
+        |FROM MyTable AS T
         |JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D 
         | ON T.a = D.id
       """.stripMargin
@@ -786,9 +796,25 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri
     // both hints on corresponding tables will take effect
     val sql =
       """
-        |SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'), 
-        |           LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * 
-        |FROM MyTable AS T 
+        |SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered'),
+        |           LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ *
+        |FROM MyTable AS T
+        |JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime
+        |  ON T.a = AsyncLookupTable.id
+        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime
+        |  ON T.a = LookupTable.id
+      """.stripMargin
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testMultipleJoinHintsWithDifferentTableAlias(): Unit = {
+    // both hints on corresponding tables will take effect
+    val sql =
+      """
+        |SELECT /*+ LOOKUP('table'='D', 'output-mode'='allow_unordered'),
+        |           LOOKUP('table'='D1', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ *
+        |FROM MyTable AS T
         |JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D 
         |  ON T.a = D.id
         |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D1 
@@ -800,7 +826,7 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri
   @Test
   def testJoinSyncTableWithAsyncHint(): Unit = {
     val sql =
-      "SELECT /*+ LOOKUP('table'='LookupTable', 'async'='true') */ * FROM MyTable AS T JOIN LookupTable " +
+      "SELECT /*+ LOOKUP('table'='D', 'async'='true') */ * FROM MyTable AS T JOIN LookupTable " +
         "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"
     util.verifyExecPlan(sql)
   }
@@ -808,7 +834,7 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri
   @Test
   def testJoinAsyncTableWithAsyncHint(): Unit = {
     val sql =
-      "SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'async'='true') */ * " +
+      "SELECT /*+ LOOKUP('table'='D', 'async'='true') */ * " +
         "FROM MyTable AS T JOIN AsyncLookupTable " +
         "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"
     util.verifyExecPlan(sql)
@@ -817,7 +843,7 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri
   @Test
   def testJoinAsyncTableWithSyncHint(): Unit = {
     val sql =
-      "SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'async'='false') */ * " +
+      "SELECT /*+ LOOKUP('table'='D', 'async'='false') */ * " +
         "FROM MyTable AS T JOIN AsyncLookupTable " +
         "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"
     util.verifyExecPlan(sql)
@@ -883,7 +909,7 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri
     stmt.addInsertSql(
       """
         |INSERT INTO Sink1
-        |SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered', 'time-out'='600s', 'capacity'='300') */
+        |SELECT /*+ LOOKUP('table'='D', 'output-mode'='allow_unordered', 'time-out'='600s', 'capacity'='300') */
         | T.a, D.name, D.age
         |FROM MyTable T
         |JOIN AsyncLookupTable
@@ -899,7 +925,7 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri
     stmt.addInsertSql(
       """
         |INSERT INTO Sink1
-        |SELECT /*+ LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */
+        |SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */
         | T.a, D.name, D.age
         |FROM MyTable T
         |JOIN LookupTable
@@ -915,7 +941,7 @@ class LookupJoinTest(legacyTableSource: Boolean) extends TableTestBase with Seri
     stmt.addInsertSql(
       """
         |INSERT INTO Sink1
-        |SELECT /*+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='allow_unordered', 'time-out'='600s', 'capacity'='300', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */
+        |SELECT /*+ LOOKUP('table'='D', 'output-mode'='allow_unordered', 'time-out'='600s', 'capacity'='300', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */
         | T.a, D.name, D.age
         |FROM MyTable T
         |JOIN AsyncLookupTable
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
index 864c01527bd..c2e8e2e606b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala
@@ -444,7 +444,7 @@ class AsyncLookupJoinITCase(
 
   @Test
   def testAsyncJoinTemporalTableWithRetry(): Unit = {
-    val maxRetryTwiceHint = getAsyncRetryLookupHint("user_table", 2)
+    val maxRetryTwiceHint = getAsyncRetryLookupHint("D", 2)
     val sink = new TestingAppendSink
     tEnv
       .sqlQuery(s"""
@@ -463,7 +463,7 @@ class AsyncLookupJoinITCase(
 
   @Test
   def testAsyncJoinTemporalTableWithLookupThresholdWithInsufficientRetry(): Unit = {
-    val maxRetryOnceHint = getAsyncRetryLookupHint("user_table_with_lookup_threshold3", 1)
+    val maxRetryOnceHint = getAsyncRetryLookupHint("D", 1)
     val sink = new TestingAppendSink
     tEnv
       .sqlQuery(s"""
@@ -492,7 +492,7 @@ class AsyncLookupJoinITCase(
     // retry work, but due the fast finish of testing bounded source, it has no assurance of the
     // max attempts number, it only ensures at least one retry for each element in current version
     // so we can only use a max lookup threshold to 2 to get a deterministic results
-    val maxRetryTwiceHint = getAsyncRetryLookupHint("user_table_with_lookup_threshold2", 2)
+    val maxRetryTwiceHint = getAsyncRetryLookupHint("D", 2)
 
     val sink = new TestingAppendSink
     tEnv
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
index 39efec4a146..08dec3d4321 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
@@ -740,7 +740,7 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType)
 
   @Test
   def testJoinTemporalTableWithRetry(): Unit = {
-    val maxRetryTwiceHint = getRetryLookupHint("user_table", 2)
+    val maxRetryTwiceHint = getRetryLookupHint("D", 2)
     val sink = new TestingAppendSink
     tEnv
       .sqlQuery(s"""
@@ -759,7 +759,7 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType)
 
   @Test
   def testJoinTemporalTableWithLookupThresholdWithInsufficientRetry(): Unit = {
-    val maxRetryOnceHint = getRetryLookupHint("user_table_with_lookup_threshold3", 1)
+    val maxRetryOnceHint = getRetryLookupHint("D", 1)
     val sink = new TestingAppendSink
     tEnv
       .sqlQuery(s"""
@@ -783,7 +783,7 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType)
 
   @Test
   def testJoinTemporalTableWithLookupThresholdWithSufficientRetry(): Unit = {
-    val maxRetryTwiceHint = getRetryLookupHint("user_table_with_lookup_threshold2", 2)
+    val maxRetryTwiceHint = getRetryLookupHint("D", 2)
 
     val sink = new TestingAppendSink
     tEnv
@@ -803,7 +803,7 @@ class LookupJoinITCase(legacyTableSource: Boolean, cacheType: LookupCacheType)
   @Test
   def testJoinTemporalTableWithLookupThresholdWithLargerRetry(): Unit = {
     // max times beyond the lookup threshold of 'user_table_with_lookup_threshold2'
-    val largerRetryHint = getRetryLookupHint("user_table_with_lookup_threshold2", 10)
+    val largerRetryHint = getRetryLookupHint("D", 10)
 
     val sink = new TestingAppendSink
     tEnv


[flink] 01/03: [FLINK-28850][table-planner] pre-step: copy Snapshot node from calcite to support alias of LOOKUP hint

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 62a9d837e24150461790e7689659c50f15197ebc
Author: lincoln lee <li...@gmail.com>
AuthorDate: Wed Dec 28 11:10:20 2022 +0800

    [FLINK-28850][table-planner] pre-step: copy Snapshot node from calcite to support alias of LOOKUP hint
    
    This closes #20800
---
 .../java/org/apache/calcite/rel/core/Snapshot.java | 104 +++++++++++++++++++++
 1 file changed, 104 insertions(+)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java
new file mode 100644
index 00000000000..b47aa300d37
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.core;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Litmus;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Relational expression that returns the contents of a relation expression as it was at a given
+ * time in the past.
+ *
+ * <p>For example, if {@code Products} is a temporal table, and {@link TableScan}(Products) is a
+ * relational operator that returns all versions of the contents of the table, then {@link
+ * Snapshot}(TableScan(Products)) is a relational operator that only returns the contents whose
+ * versions that overlap with the given specific period (i.e. those that started before given period
+ * and ended after it).
+ */
+public abstract class Snapshot extends SingleRel {
+    // ~ Instance fields --------------------------------------------------------
+
+    private final RexNode period;
+
+    // ~ Constructors -----------------------------------------------------------
+
+    /**
+     * Creates a Snapshot.
+     *
+     * @param cluster Cluster that this relational expression belongs to
+     * @param traitSet The traits of this relational expression
+     * @param input Input relational expression
+     * @param period Timestamp expression which as the table was at the given time in the past
+     */
+    @SuppressWarnings("method.invocation.invalid")
+    protected Snapshot(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RexNode period) {
+        super(cluster, traitSet, input);
+        this.period = Objects.requireNonNull(period, "period");
+        assert isValid(Litmus.THROW, null);
+    }
+
+    // ~ Methods ----------------------------------------------------------------
+
+    @Override
+    public final RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return copy(traitSet, sole(inputs), getPeriod());
+    }
+
+    public abstract Snapshot copy(RelTraitSet traitSet, RelNode input, RexNode period);
+
+    @Override
+    public RelNode accept(RexShuttle shuttle) {
+        RexNode condition = shuttle.apply(this.period);
+        if (this.period == condition) {
+            return this;
+        }
+        return copy(traitSet, getInput(), condition);
+    }
+
+    @Override
+    public RelWriter explainTerms(RelWriter pw) {
+        return super.explainTerms(pw).item("period", period);
+    }
+
+    public RexNode getPeriod() {
+        return period;
+    }
+
+    @Override
+    public boolean isValid(Litmus litmus, @Nullable Context context) {
+        RelDataType dataType = period.getType();
+        if (dataType.getSqlTypeName() != SqlTypeName.TIMESTAMP) {
+            return litmus.fail(
+                    "The system time period specification expects Timestamp type but is '"
+                            + dataType.getSqlTypeName()
+                            + "'");
+        }
+        return litmus.succeed();
+    }
+}