You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/12/29 03:14:09 UTC
[flink] 02/03: [FLINK-28850][table-planner] cherry-pick CALCITE-5251 to support hint for Snapshot node
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f32ba645246a8180a02182650fb51392facfcc09
Author: lincoln lee <li...@gmail.com>
AuthorDate: Fri Sep 9 16:44:03 2022 +0800
[FLINK-28850][table-planner] cherry-pick CALCITE-5251 to support hint for Snapshot node
This closes #20800
---
.../java/org/apache/calcite/rel/core/Snapshot.java | 42 ++++++++++++++++++----
.../apache/calcite/rel/hint/HintPredicates.java | 7 ++++
.../calcite/rel/hint/NodeTypeHintPredicate.java | 6 +++-
.../calcite/rel/logical/LogicalSnapshot.java | 34 ++++++++++++++++--
4 files changed, 79 insertions(+), 10 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java
index b47aa300d37..531f69345dc 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/core/Snapshot.java
@@ -16,17 +16,19 @@
*/
package org.apache.calcite.rel.core;
+import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.hint.Hintable;
+import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Litmus;
-import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.List;
import java.util.Objects;
@@ -40,12 +42,17 @@ import java.util.Objects;
* Snapshot}(TableScan(Products)) is a relational operator that only returns the contents whose
* versions that overlap with the given specific period (i.e. those that started before given period
* and ended after it).
+ *
+ * <p>Temporarily copy from calcite to cherry-pick [CALCITE-5251] and will be removed when upgrade
+ * to caclite-1.32.0.
*/
-public abstract class Snapshot extends SingleRel {
+public abstract class Snapshot extends SingleRel implements Hintable {
// ~ Instance fields --------------------------------------------------------
private final RexNode period;
+ protected final ImmutableList<RelHint> hints;
+
// ~ Constructors -----------------------------------------------------------
/**
@@ -53,16 +60,35 @@ public abstract class Snapshot extends SingleRel {
*
* @param cluster Cluster that this relational expression belongs to
* @param traitSet The traits of this relational expression
+ * @param hints Hints for this node
* @param input Input relational expression
* @param period Timestamp expression which as the table was at the given time in the past
*/
- @SuppressWarnings("method.invocation.invalid")
- protected Snapshot(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RexNode period) {
+ protected Snapshot(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ List<RelHint> hints,
+ RelNode input,
+ RexNode period) {
super(cluster, traitSet, input);
this.period = Objects.requireNonNull(period, "period");
+ this.hints = ImmutableList.copyOf(hints);
assert isValid(Litmus.THROW, null);
}
+ /**
+ * Creates a Snapshot.
+ *
+ * @param cluster Cluster that this relational expression belongs to
+ * @param traitSet The traits of this relational expression
+ * @param input Input relational expression
+ * @param period Timestamp expression which as the table was at the given time in the past
+ */
+ @SuppressWarnings("method.invocation.invalid")
+ protected Snapshot(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RexNode period) {
+ this(cluster, traitSet, ImmutableList.of(), input, period);
+ }
+
// ~ Methods ----------------------------------------------------------------
@Override
@@ -72,7 +98,6 @@ public abstract class Snapshot extends SingleRel {
public abstract Snapshot copy(RelTraitSet traitSet, RelNode input, RexNode period);
- @Override
public RelNode accept(RexShuttle shuttle) {
RexNode condition = shuttle.apply(this.period);
if (this.period == condition) {
@@ -91,7 +116,7 @@ public abstract class Snapshot extends SingleRel {
}
@Override
- public boolean isValid(Litmus litmus, @Nullable Context context) {
+ public boolean isValid(Litmus litmus, RelNode.Context context) {
RelDataType dataType = period.getType();
if (dataType.getSqlTypeName() != SqlTypeName.TIMESTAMP) {
return litmus.fail(
@@ -101,4 +126,9 @@ public abstract class Snapshot extends SingleRel {
}
return litmus.succeed();
}
+
+ @Override
+ public ImmutableList<RelHint> getHints() {
+ return hints;
+ }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/HintPredicates.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/HintPredicates.java
index a38ad2e141e..7081c68041a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/HintPredicates.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/HintPredicates.java
@@ -109,6 +109,13 @@ public abstract class HintPredicates {
public static final HintPredicate WINDOW =
new NodeTypeHintPredicate(NodeTypeHintPredicate.NodeType.WINDOW);
+ /**
+ * A hint predicate that indicates a hint can only be used to {@link
+ * org.apache.calcite.rel.core.Snapshot} nodes.
+ */
+ public static final HintPredicate SNAPSHOT =
+ new NodeTypeHintPredicate(NodeTypeHintPredicate.NodeType.SNAPSHOT);
+
/**
* Returns a composed hint predicate that represents a short-circuiting logical AND of an array
* of hint predicates {@code hintPredicates}. When evaluating the composed predicate, if a
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/NodeTypeHintPredicate.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/NodeTypeHintPredicate.java
index 4ed6d117c09..9fce804833e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/NodeTypeHintPredicate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/hint/NodeTypeHintPredicate.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Snapshot;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.core.Values;
@@ -78,7 +79,10 @@ public class NodeTypeHintPredicate implements HintPredicate {
VALUES(Values.class),
/** The hint would be propagated to the Window nodes. */
- WINDOW(Window.class);
+ WINDOW(Window.class),
+
+ /** The hint would be propagated to the Snapshot nodes. */
+ SNAPSHOT(Snapshot.class);
/** Relational expression clazz that the hint can apply to. */
private Class<?> relClazz;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java
index d32e0924dd4..f2e974d3dd3 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java
@@ -18,6 +18,7 @@
package org.apache.calcite.rel.logical;
+import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
@@ -25,6 +26,7 @@ import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelDistributionTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Snapshot;
+import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.metadata.RelMdCollation;
import org.apache.calcite.rel.metadata.RelMdDistribution;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
@@ -32,16 +34,37 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Litmus;
+import java.util.List;
+
/**
* Sub-class of {@link org.apache.calcite.rel.core.Snapshot} not targeted at any particular engine
* or calling convention. The class was copied over because of * CALCITE-4554. *
*
- * <p>Line 80 ~ 91: Calcite only supports timestamp type as period type, but Flink supports both
+ * <p>Line 106 ~ 117: Calcite only supports timestamp type as period type, but Flink supports both
* Timestamp and TimestampLtz. Should be removed once calcite support TimestampLtz as period type.
*/
public class LogicalSnapshot extends Snapshot {
// ~ Constructors -----------------------------------------------------------
+ /**
+ * Creates a LogicalSnapshot.
+ *
+ * <p>Use {@link #create} unless you know what you're doing.
+ *
+ * @param cluster Cluster that this relational expression belongs to
+ * @param traitSet The traits of this relational expression
+ * @param hints Hints for this node
+ * @param input Input relational expression
+ * @param period Timestamp expression which as the table was at the given time in the past
+ */
+ public LogicalSnapshot(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ List<RelHint> hints,
+ RelNode input,
+ RexNode period) {
+ super(cluster, traitSet, hints, input, period);
+ }
/**
* Creates a LogicalSnapshot.
@@ -55,12 +78,12 @@ public class LogicalSnapshot extends Snapshot {
*/
public LogicalSnapshot(
RelOptCluster cluster, RelTraitSet traitSet, RelNode input, RexNode period) {
- super(cluster, traitSet, input, period);
+ super(cluster, traitSet, ImmutableList.of(), input, period);
}
@Override
public Snapshot copy(RelTraitSet traitSet, RelNode input, RexNode period) {
- return new LogicalSnapshot(getCluster(), traitSet, input, period);
+ return new LogicalSnapshot(getCluster(), traitSet, hints, input, period);
}
/** Creates a LogicalSnapshot. */
@@ -93,4 +116,9 @@ public class LogicalSnapshot extends Snapshot {
}
return litmus.succeed();
}
+
+ @Override
+ public RelNode withHints(final List<RelHint> hintList) {
+ return new LogicalSnapshot(getCluster(), traitSet, hintList, input, getPeriod());
+ }
}