You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/12/29 03:14:09 UTC

[flink] 02/03: [FLINK-28850][table-planner] cherry-pick CALCITE-5251 to support hint for Snapshot node

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f32ba645246a8180a02182650fb51392facfcc09
Author: lincoln lee <li...@gmail.com>
AuthorDate: Fri Sep 9 16:44:03 2022 +0800

    [FLINK-28850][table-planner] cherry-pick CALCITE-5251 to support hint for Snapshot node
    
    This closes #20800
---
 .../java/org/apache/calcite/rel/core/Snapshot.java | 42 ++++++++++++++++++----
 .../apache/calcite/rel/hint/HintPredicates.java    |  7 ++++
 .../calcite/rel/hint/NodeTypeHintPredicate.java    |  6 +++-
 .../calcite/rel/logical/LogicalSnapshot.java       | 34 ++++++++++++++++--
 4 files changed, 79 insertions(+), 10 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java
index b47aa300d37..531f69345dc 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java
@@ -16,17 +16,19 @@
  */
 package org.apache.calcite.rel.core;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.hint.Hintable;
+import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexShuttle;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.Litmus;
-import org.checkerframework.checker.nullness.qual.Nullable;
 
 import java.util.List;
 import java.util.Objects;
@@ -40,12 +42,17 @@ import java.util.Objects;
  * Snapshot}(TableScan(Products)) is a relational operator that only returns the contents whose
  * versions that overlap with the given specific period (i.e. those that started before given period
  * and ended after it).
+ *
+ * <p>Temporarily copy from calcite to cherry-pick [CALCITE-5251] and will be removed when upgrade
+ * to caclite-1.32.0.
  */
-public abstract class Snapshot extends SingleRel {
+public abstract class Snapshot extends SingleRel implements Hintable {
     // ~ Instance fields --------------------------------------------------------
 
     private final RexNode period;
 
+    protected final ImmutableList<RelHint> hints;
+
     // ~ Constructors -----------------------------------------------------------
 
     /**
@@ -53,16 +60,35 @@ public abstract class Snapshot extends SingleRel {
      *
      * @param cluster Cluster that this relational expression belongs to
      * @param traitSet The traits of this relational expression
+     * @param hints Hints for this node
      * @param input Input relational expression
      * @param period Timestamp expression which as the table was at the given time in the past
      */
-    @SuppressWarnings("method.invocation.invalid")
-    protected Snapshot(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RexNode period) {
+    protected Snapshot(
+            RelOptCluster cluster,
+            RelTraitSet traitSet,
+            List<RelHint> hints,
+            RelNode input,
+            RexNode period) {
         super(cluster, traitSet, input);
         this.period = Objects.requireNonNull(period, "period");
+        this.hints = ImmutableList.copyOf(hints);
         assert isValid(Litmus.THROW, null);
     }
 
+    /**
+     * Creates a Snapshot.
+     *
+     * @param cluster Cluster that this relational expression belongs to
+     * @param traitSet The traits of this relational expression
+     * @param input Input relational expression
+     * @param period Timestamp expression which as the table was at the given time in the past
+     */
+    @SuppressWarnings("method.invocation.invalid")
+    protected Snapshot(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RexNode period) {
+        this(cluster, traitSet, ImmutableList.of(), input, period);
+    }
+
     // ~ Methods ----------------------------------------------------------------
 
     @Override
@@ -72,7 +98,6 @@ public abstract class Snapshot extends SingleRel {
 
     public abstract Snapshot copy(RelTraitSet traitSet, RelNode input, RexNode period);
 
-    @Override
     public RelNode accept(RexShuttle shuttle) {
         RexNode condition = shuttle.apply(this.period);
         if (this.period == condition) {
@@ -91,7 +116,7 @@ public abstract class Snapshot extends SingleRel {
     }
 
     @Override
-    public boolean isValid(Litmus litmus, @Nullable Context context) {
+    public boolean isValid(Litmus litmus, RelNode.Context context) {
         RelDataType dataType = period.getType();
         if (dataType.getSqlTypeName() != SqlTypeName.TIMESTAMP) {
             return litmus.fail(
@@ -101,4 +126,9 @@ public abstract class Snapshot extends SingleRel {
         }
         return litmus.succeed();
     }
+
+    @Override
+    public ImmutableList<RelHint> getHints() {
+        return hints;
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/HintPredicates.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/HintPredicates.java
index a38ad2e141e..7081c68041a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/HintPredicates.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/HintPredicates.java
@@ -109,6 +109,13 @@ public abstract class HintPredicates {
     public static final HintPredicate WINDOW =
             new NodeTypeHintPredicate(NodeTypeHintPredicate.NodeType.WINDOW);
 
+    /**
+     * A hint predicate that indicates a hint can only be used to {@link
+     * org.apache.calcite.rel.core.Snapshot} nodes.
+     */
+    public static final HintPredicate SNAPSHOT =
+            new NodeTypeHintPredicate(NodeTypeHintPredicate.NodeType.SNAPSHOT);
+
     /**
      * Returns a composed hint predicate that represents a short-circuiting logical AND of an array
      * of hint predicates {@code hintPredicates}. When evaluating the composed predicate, if a
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/NodeTypeHintPredicate.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/NodeTypeHintPredicate.java
index 4ed6d117c09..9fce804833e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/NodeTypeHintPredicate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/NodeTypeHintPredicate.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Snapshot;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Values;
@@ -78,7 +79,10 @@ public class NodeTypeHintPredicate implements HintPredicate {
         VALUES(Values.class),
 
         /** The hint would be propagated to the Window nodes. */
-        WINDOW(Window.class);
+        WINDOW(Window.class),
+
+        /** The hint would be propagated to the Snapshot nodes. */
+        SNAPSHOT(Snapshot.class);
 
         /** Relational expression clazz that the hint can apply to. */
         private Class<?> relClazz;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java
index d32e0924dd4..f2e974d3dd3 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java
@@ -18,6 +18,7 @@
 
 package org.apache.calcite.rel.logical;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
@@ -25,6 +26,7 @@ import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.RelDistributionTraitDef;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Snapshot;
+import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.metadata.RelMdCollation;
 import org.apache.calcite.rel.metadata.RelMdDistribution;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
@@ -32,16 +34,37 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.Litmus;
 
+import java.util.List;
+
 /**
  * Sub-class of {@link org.apache.calcite.rel.core.Snapshot} not targeted at any particular engine
  * or calling convention. The class was copied over because of * CALCITE-4554. *
  *
- * <p>Line 80 ~ 91: Calcite only supports timestamp type as period type, but Flink supports both
+ * <p>Line 106 ~ 117: Calcite only supports timestamp type as period type, but Flink supports both
  * Timestamp and TimestampLtz. Should be removed once calcite support TimestampLtz as period type.
  */
 public class LogicalSnapshot extends Snapshot {
 
     // ~ Constructors -----------------------------------------------------------
+    /**
+     * Creates a LogicalSnapshot.
+     *
+     * <p>Use {@link #create} unless you know what you're doing.
+     *
+     * @param cluster Cluster that this relational expression belongs to
+     * @param traitSet The traits of this relational expression
+     * @param hints Hints for this node
+     * @param input Input relational expression
+     * @param period Timestamp expression which as the table was at the given time in the past
+     */
+    public LogicalSnapshot(
+            RelOptCluster cluster,
+            RelTraitSet traitSet,
+            List<RelHint> hints,
+            RelNode input,
+            RexNode period) {
+        super(cluster, traitSet, hints, input, period);
+    }
 
     /**
      * Creates a LogicalSnapshot.
@@ -55,12 +78,12 @@ public class LogicalSnapshot extends Snapshot {
      */
     public LogicalSnapshot(
             RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RexNode period) {
-        super(cluster, traitSet, input, period);
+        super(cluster, traitSet, ImmutableList.of(), input, period);
     }
 
     @Override
     public Snapshot copy(RelTraitSet traitSet, RelNode input, RexNode period) {
-        return new LogicalSnapshot(getCluster(), traitSet, input, period);
+        return new LogicalSnapshot(getCluster(), traitSet, hints, input, period);
     }
 
     /** Creates a LogicalSnapshot. */
@@ -93,4 +116,9 @@ public class LogicalSnapshot extends Snapshot {
         }
         return litmus.succeed();
     }
+
+    @Override
+    public RelNode withHints(final List<RelHint> hintList) {
+        return new LogicalSnapshot(getCluster(), traitSet, hintList, input, getPeriod());
+    }
 }