You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/03/23 13:18:13 UTC
[flink] 01/02: [FLINK-26518][table-planner] Port FlinkRelBuilder to Java
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1e1b182e702ec25a38c3c02d585734415eae110a
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Mar 9 12:14:46 2022 +0100
[FLINK-26518][table-planner] Port FlinkRelBuilder to Java
---
.../table/planner/calcite/FlinkRelBuilder.java | 235 +++++++++++++++++++++
.../catalog/QueryOperationCatalogViewTable.java | 8 +-
.../table/planner/delegation/PlannerContext.java | 2 +-
.../plan/rules/logical/SubQueryDecorrelator.java | 3 +-
.../BatchPhysicalPythonWindowAggregateRule.java | 3 +-
...reamPhysicalPythonGroupWindowAggregateRule.java | 2 +-
.../table/planner/calcite/FlinkRelBuilder.scala | 233 --------------------
.../planner/expressions/fieldExpression.scala | 7 -
.../planner/expressions/windowProperties.scala | 5 -
.../nodes/calcite/LogicalWindowAggregate.scala | 6 +-
.../calcite/LogicalWindowTableAggregate.scala | 4 +-
.../plan/nodes/calcite/WindowAggregate.scala | 11 +-
.../plan/nodes/calcite/WindowTableAggregate.scala | 10 +-
.../logical/FlinkLogicalWindowAggregate.scala | 2 +-
.../logical/FlinkLogicalWindowTableAggregate.scala | 2 +-
.../rules/logical/CorrelateSortToRankRule.scala | 2 +-
16 files changed, 267 insertions(+), 268 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
new file mode 100644
index 0000000..35ab473
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.calcite;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.planner.calcite.FlinkRelFactories.ExpandFactory;
+import org.apache.flink.table.planner.calcite.FlinkRelFactories.RankFactory;
+import org.apache.flink.table.planner.hint.FlinkHints;
+import org.apache.flink.table.planner.plan.QueryOperationConverter;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalTableAggregate;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowTableAggregate;
+import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
+import org.apache.flink.table.runtime.operators.rank.RankRange;
+import org.apache.flink.table.runtime.operators.rank.RankType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptSchema;
+import org.apache.calcite.plan.RelOptTable.ToRelContext;
+import org.apache.calcite.plan.ViewExpanders;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.UnaryOperator;
+
+import static org.apache.flink.table.planner.plan.utils.AggregateUtil.isTableAggregate;
+
+/** Flink-specific {@link RelBuilder}. */
+@Internal
+public final class FlinkRelBuilder extends RelBuilder {
+
+ private final QueryOperationConverter toRelNodeConverter;
+
+ private final ExpandFactory expandFactory;
+
+ private final RankFactory rankFactory;
+
+ private FlinkRelBuilder(Context context, RelOptCluster cluster, RelOptSchema relOptSchema) {
+ super(context, cluster, relOptSchema);
+
+ this.toRelNodeConverter =
+ new QueryOperationConverter(this, context.unwrap(FlinkContext.class).isBatchMode());
+ this.expandFactory =
+ Util.first(
+ context.unwrap(ExpandFactory.class),
+ FlinkRelFactories.DEFAULT_EXPAND_FACTORY());
+ this.rankFactory =
+ Util.first(
+ context.unwrap(RankFactory.class),
+ FlinkRelFactories.DEFAULT_RANK_FACTORY());
+ }
+
+ public static FlinkRelBuilder of(
+ Context context, RelOptCluster cluster, RelOptSchema relOptSchema) {
+ return new FlinkRelBuilder(Preconditions.checkNotNull(context), cluster, relOptSchema);
+ }
+
+ public static FlinkRelBuilder of(RelOptCluster cluster, RelOptSchema relOptSchema) {
+ return FlinkRelBuilder.of(cluster.getPlanner().getContext(), cluster, relOptSchema);
+ }
+
+ public static RelBuilderFactory proto(Context context) {
+ return (cluster, schema) -> {
+ final Context clusterContext = cluster.getPlanner().getContext();
+ final Context chain = Contexts.chain(context, clusterContext);
+ return FlinkRelBuilder.of(chain, cluster, schema);
+ };
+ }
+
+ public RelBuilder expand(List<List<RexNode>> projects, int expandIdIndex) {
+ final RelNode input = build();
+ final RelNode expand = expandFactory.createExpand(input, projects, expandIdIndex);
+ return push(expand);
+ }
+
+ public RelBuilder rank(
+ ImmutableBitSet partitionKey,
+ RelCollation orderKey,
+ RankType rankType,
+ RankRange rankRange,
+ RelDataTypeField rankNumberType,
+ boolean outputRankNumber) {
+ final RelNode input = build();
+ final RelNode rank =
+ rankFactory.createRank(
+ input,
+ partitionKey,
+ orderKey,
+ rankType,
+ rankRange,
+ rankNumberType,
+ outputRankNumber);
+ return push(rank);
+ }
+
+ /** Build non-window aggregate for either aggregate or table aggregate. */
+ @Override
+ public RelBuilder aggregate(
+ RelBuilder.GroupKey groupKey, Iterable<RelBuilder.AggCall> aggCalls) {
+ // build a relNode, the build() may also return a project
+ RelNode relNode = super.aggregate(groupKey, aggCalls).build();
+
+ if (relNode instanceof LogicalAggregate) {
+ final LogicalAggregate logicalAggregate = (LogicalAggregate) relNode;
+ if (isTableAggregate(logicalAggregate.getAggCallList())) {
+ relNode = LogicalTableAggregate.create(logicalAggregate);
+ } else if (isCountStarAgg(logicalAggregate)) {
+ final RelNode newAggInput =
+ push(logicalAggregate.getInput(0)).project(literal(0)).build();
+ relNode =
+ logicalAggregate.copy(
+ logicalAggregate.getTraitSet(), ImmutableList.of(newAggInput));
+ }
+ }
+
+ return push(relNode);
+ }
+
+ /** Build window aggregate for either aggregate or table aggregate. */
+ public RelBuilder windowAggregate(
+ LogicalWindow window,
+ GroupKey groupKey,
+ List<NamedWindowProperty> namedProperties,
+ Iterable<AggCall> aggCalls) {
+ // build logical aggregate
+
+ // Because of:
+ // [CALCITE-3763] RelBuilder.aggregate should prune unused fields from the input,
+ // if the input is a Project.
+ //
+ // the field can not be pruned if it is referenced by other expressions
+ // of the window aggregation(i.e. the TUMBLE_START/END).
+ // To solve this, we config the RelBuilder to forbidden this feature.
+ final LogicalAggregate aggregate =
+ (LogicalAggregate)
+ super.transform(t -> t.withPruneInputOfAggregate(false))
+ .push(build())
+ .aggregate(groupKey, aggCalls)
+ .build();
+
+ // build logical window aggregate from it
+ final RelNode windowAggregate;
+ if (isTableAggregate(aggregate.getAggCallList())) {
+ windowAggregate =
+ LogicalWindowTableAggregate.create(window, namedProperties, aggregate);
+ } else {
+ windowAggregate = LogicalWindowAggregate.create(window, namedProperties, aggregate);
+ }
+ return push(windowAggregate);
+ }
+
+ /** Build watermark assigner relational node. */
+ public RelBuilder watermark(int rowtimeFieldIndex, RexNode watermarkExpr) {
+ final RelNode input = build();
+ final RelNode relNode =
+ LogicalWatermarkAssigner.create(cluster, input, rowtimeFieldIndex, watermarkExpr);
+ return push(relNode);
+ }
+
+ public RelBuilder queryOperation(QueryOperation queryOperation) {
+ final RelNode relNode = queryOperation.accept(toRelNodeConverter);
+ return push(relNode);
+ }
+
+ public RelBuilder scan(ObjectIdentifier identifier, Map<String, String> dynamicOptions) {
+ final List<RelHint> hints = new ArrayList<>();
+ hints.add(
+ RelHint.builder(FlinkHints.HINT_NAME_OPTIONS).hintOptions(dynamicOptions).build());
+ final ToRelContext toRelContext = ViewExpanders.simpleContext(cluster, hints);
+ final RelNode relNode =
+ relOptSchema.getTableForMember(identifier.toList()).toRel(toRelContext);
+ return push(relNode);
+ }
+
+ @Override
+ public FlinkTypeFactory getTypeFactory() {
+ return (FlinkTypeFactory) super.getTypeFactory();
+ }
+
+ @Override
+ public RelBuilder transform(UnaryOperator<Config> transform) {
+ // Override in order to return a FlinkRelBuilder.
+ final Context mergedContext =
+ Contexts.of(transform.apply(Config.DEFAULT), cluster.getPlanner().getContext());
+ return FlinkRelBuilder.of(mergedContext, cluster, relOptSchema);
+ }
+
+ private static boolean isCountStarAgg(LogicalAggregate agg) {
+ if (agg.getGroupCount() != 0 || agg.getAggCallList().size() != 1) {
+ return false;
+ }
+ final AggregateCall call = agg.getAggCallList().get(0);
+ return call.getAggregation().getKind() == SqlKind.COUNT
+ && call.filterArg == -1
+ && call.getArgList().isEmpty();
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
index 9ba8e6e..32353c1 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
@@ -25,6 +25,9 @@ import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable;
import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
@@ -70,8 +73,9 @@ public class QueryOperationCatalogViewTable extends ExpandingPreparingTable {
@Override
public RelNode convertToRel(RelOptTable.ToRelContext context) {
- FlinkRelBuilder relBuilder =
- FlinkRelBuilder.of(context, context.getCluster(), this.getRelOptSchema());
+ final RelOptCluster cluster = context.getCluster();
+ final Context chain = Contexts.of(context, cluster.getPlanner().getContext());
+ final FlinkRelBuilder relBuilder = FlinkRelBuilder.of(chain, cluster, getRelOptSchema());
return relBuilder.queryOperation(catalogView.getQueryOperation()).build();
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
index 19a4b6e..e98e582 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
@@ -184,7 +184,7 @@ public class PlannerContext {
context,
// Sets up the ViewExpander explicitly for FlinkRelBuilder.
createFlinkPlanner(currentCatalog, currentDatabase).createToRelContext());
- return new FlinkRelBuilder(chain, cluster, relOptSchema);
+ return FlinkRelBuilder.of(chain, cluster, relOptSchema);
}
/**
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java
index b80b884..828286b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java
@@ -134,8 +134,7 @@ public class SubQueryDecorrelator extends RelShuttleImpl {
}
RelOptCluster cluster = rootRel.getCluster();
- RelBuilder relBuilder =
- new FlinkRelBuilder(cluster.getPlanner().getContext(), cluster, null);
+ RelBuilder relBuilder = FlinkRelBuilder.of(cluster, null);
RexBuilder rexBuilder = cluster.getRexBuilder();
final SubQueryDecorrelator decorrelator =
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java
index 9eadb76..847b803 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
import org.apache.flink.table.planner.plan.utils.PythonUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.types.DataType;
import org.apache.calcite.plan.RelOptRule;
@@ -160,7 +161,7 @@ public class BatchPhysicalPythonWindowAggregateRule extends RelOptRule {
window,
inputTimeFieldIndex,
inputTimeIsDate,
- agg.getNamedProperties());
+ JavaScalaConversionUtil.toScala(agg.getNamedProperties()));
call.transformTo(windowAgg);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
index 002f1ae..5f080d7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
@@ -143,7 +143,7 @@ public class StreamPhysicalPythonGroupWindowAggregateRule extends ConverterRule
agg.getGroupSet().toArray(),
JavaScalaConversionUtil.toScala(aggCalls),
agg.getWindow(),
- agg.getNamedProperties(),
+ JavaScalaConversionUtil.toScala(agg.getNamedProperties()),
emitStrategy);
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala
deleted file mode 100644
index b96e510..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.calcite
-
-import org.apache.flink.table.operations.QueryOperation
-import org.apache.flink.table.planner.calcite.FlinkRelFactories.{ExpandFactory, RankFactory}
-import org.apache.flink.table.planner.expressions.WindowProperty
-import org.apache.flink.table.planner.plan.QueryOperationConverter
-import org.apache.flink.table.planner.plan.logical.LogicalWindow
-import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalTableAggregate, LogicalWatermarkAssigner, LogicalWindowAggregate, LogicalWindowTableAggregate}
-import org.apache.flink.table.planner.plan.utils.AggregateUtil
-import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty
-import org.apache.flink.table.runtime.operators.rank.{RankRange, RankType}
-
-import com.google.common.collect.ImmutableList
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.RelCollation
-import org.apache.calcite.rel.`type`.RelDataTypeField
-import org.apache.calcite.rel.hint.RelHint
-import org.apache.calcite.rel.logical.LogicalAggregate
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.SqlKind
-import org.apache.calcite.tools.RelBuilder.{AggCall, Config, GroupKey}
-import org.apache.calcite.tools.{RelBuilder, RelBuilderFactory}
-import org.apache.calcite.util.{ImmutableBitSet, Util}
-import org.apache.flink.table.catalog.ObjectIdentifier
-import org.apache.flink.table.planner.hint.FlinkHints
-
-import java.lang.Iterable
-import java.util
-import java.util.List
-import java.util.function.UnaryOperator
-
-import scala.collection.JavaConversions._
-
-/**
- * Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]].
- */
-class FlinkRelBuilder(
- context: Context,
- relOptCluster: RelOptCluster,
- relOptSchema: RelOptSchema)
- extends RelBuilder(
- context,
- relOptCluster,
- relOptSchema) {
-
- require(context != null)
-
- private val toRelNodeConverter = {
- new QueryOperationConverter(this, context.unwrap(classOf[FlinkContext]).isBatchMode)
- }
-
- private val expandFactory: ExpandFactory = {
- Util.first(context.unwrap(classOf[ExpandFactory]), FlinkRelFactories.DEFAULT_EXPAND_FACTORY)
- }
-
- private val rankFactory: RankFactory = {
- Util.first(context.unwrap(classOf[RankFactory]), FlinkRelFactories.DEFAULT_RANK_FACTORY)
- }
-
- override def getRelOptSchema: RelOptSchema = relOptSchema
-
- override def getCluster: RelOptCluster = relOptCluster
-
- override def getTypeFactory: FlinkTypeFactory =
- super.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-
- override def transform(transform: UnaryOperator[RelBuilder.Config]): FlinkRelBuilder = {
- // Override in order to return a FlinkRelBuilder.
- FlinkRelBuilder.of(transform.apply(Config.DEFAULT), cluster, relOptSchema)
- }
-
- def expand(
- projects: util.List[util.List[RexNode]],
- expandIdIndex: Int): RelBuilder = {
- val input = build()
- val expand = expandFactory.createExpand(input, projects, expandIdIndex)
- push(expand)
- }
-
- def rank(
- partitionKey: ImmutableBitSet,
- orderKey: RelCollation,
- rankType: RankType,
- rankRange: RankRange,
- rankNumberType: RelDataTypeField,
- outputRankNumber: Boolean): RelBuilder = {
- val input = build()
- val rank = rankFactory.createRank(input, partitionKey, orderKey, rankType, rankRange,
- rankNumberType, outputRankNumber)
- push(rank)
- }
-
- /**
- * Build non-window aggregate for either aggregate or table aggregate.
- */
- override def aggregate(groupKey: GroupKey, aggCalls: Iterable[AggCall]): RelBuilder = {
- // build a relNode, the build() may also return a project
- val relNode = super.aggregate(groupKey, aggCalls).build()
-
- def isCountStartAgg(agg: LogicalAggregate): Boolean = {
- if (agg.getGroupCount != 0 || agg.getAggCallList.size() != 1) {
- return false
- }
- val call = agg.getAggCallList.head
- call.getAggregation.getKind == SqlKind.COUNT &&
- call.filterArg == -1 && call.getArgList.isEmpty
- }
-
- relNode match {
- case logicalAggregate: LogicalAggregate
- if AggregateUtil.isTableAggregate(logicalAggregate.getAggCallList) =>
- push(LogicalTableAggregate.create(logicalAggregate))
- case logicalAggregate2: LogicalAggregate
- if isCountStartAgg(logicalAggregate2) =>
- val newAggInput = push(logicalAggregate2.getInput(0))
- .project(literal(0)).build()
- push(logicalAggregate2.copy(logicalAggregate2.getTraitSet, ImmutableList.of(newAggInput)))
- case _ => push(relNode)
- }
- }
-
- /**
- * Build window aggregate for either aggregate or table aggregate.
- */
- def windowAggregate(
- window: LogicalWindow,
- groupKey: GroupKey,
- namedProperties: List[NamedWindowProperty],
- aggCalls: Iterable[AggCall]): RelBuilder = {
- // build logical aggregate
-
- // Because of:
- // [CALCITE-3763] RelBuilder.aggregate should prune unused fields from the input,
- // if the input is a Project.
- //
- // the field can not be pruned if it is referenced by other expressions
- // of the window aggregation(i.e. the TUMBLE_START/END).
- // To solve this, we config the RelBuilder to forbidden this feature.
- val aggregate = super.transform(
- new UnaryOperator[RelBuilder.Config] {
- override def apply(t: RelBuilder.Config)
- : RelBuilder.Config = t.withPruneInputOfAggregate(false)
- })
- .push(build())
- .aggregate(groupKey, aggCalls)
- .build()
- .asInstanceOf[LogicalAggregate]
-
- // build logical window aggregate from it
- aggregate match {
- case logicalAggregate: LogicalAggregate
- if AggregateUtil.isTableAggregate(logicalAggregate.getAggCallList) =>
- push(LogicalWindowTableAggregate.create(window, namedProperties, aggregate))
- case _ => push(LogicalWindowAggregate.create(window, namedProperties, aggregate))
- }
- }
-
- /**
- * Build watermark assigner relation node.
- */
- def watermark(rowtimeFieldIndex: Int, watermarkExpr: RexNode): RelBuilder = {
- val input = build()
- val watermarkAssigner = LogicalWatermarkAssigner
- .create(cluster, input, rowtimeFieldIndex, watermarkExpr)
- push(watermarkAssigner)
- this
- }
-
- def queryOperation(queryOperation: QueryOperation): RelBuilder = {
- val relNode = queryOperation.accept(toRelNodeConverter)
- push(relNode)
- this
- }
-
- def scan(
- identifier: ObjectIdentifier,
- dynamicOptions: util.Map[String, String]): RelBuilder = {
- val hints = new util.ArrayList[RelHint]
- hints.add(RelHint.builder(FlinkHints.HINT_NAME_OPTIONS).hintOptions(dynamicOptions).build)
- val toRelContext = ViewExpanders.simpleContext(cluster, hints)
- push(relOptSchema.getTableForMember(identifier.toList).toRel(toRelContext))
- this
- }
-}
-
-object FlinkRelBuilder {
-
- case class NamedWindowProperty(name: String, property: WindowProperty)
-
- def proto(context: Context): RelBuilderFactory = new RelBuilderFactory() {
- def create(cluster: RelOptCluster, schema: RelOptSchema): RelBuilder = {
- val clusterContext = cluster.getPlanner.getContext.unwrap(classOf[FlinkContext])
- val mergedContext = Contexts.chain(context, clusterContext)
-
- new FlinkRelBuilder(mergedContext, cluster, schema)
- }
- }
-
- def of(cluster: RelOptCluster, relOptSchema: RelOptSchema): FlinkRelBuilder = {
- val clusterContext = cluster.getPlanner.getContext
- new FlinkRelBuilder(
- clusterContext,
- cluster,
- relOptSchema)
- }
-
- def of(contextVar: Object, cluster: RelOptCluster, relOptSchema: RelOptSchema)
- : FlinkRelBuilder = {
- val mergedContext = Contexts.of(contextVar, cluster.getPlanner.getContext)
- new FlinkRelBuilder(
- mergedContext,
- cluster,
- relOptSchema)
- }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
index fd86f67..013e687 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.expressions
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api._
import org.apache.flink.table.operations.QueryOperation
-import org.apache.flink.table.planner.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.calcite.FlinkTypeFactory._
import org.apache.flink.table.planner.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
@@ -150,9 +149,6 @@ case class RowtimeAttribute(expr: PlannerExpression) extends TimeAttribute(expr)
}
}
- override def toNamedWindowProperty(name: String): NamedWindowProperty =
- NamedWindowProperty(name, this)
-
override def toString: String = s"rowtime($child)"
}
@@ -174,9 +170,6 @@ case class ProctimeAttribute(expr: PlannerExpression) extends TimeAttribute(expr
override def resultType: TypeInformation[_] =
TimeIndicatorTypeInfo.PROCTIME_INDICATOR
- override def toNamedWindowProperty(name: String): NamedWindowProperty =
- NamedWindowProperty(name, this)
-
override def toString: String = s"proctime($child)"
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/windowProperties.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/windowProperties.scala
index 0e68163..ce6940a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/windowProperties.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/windowProperties.scala
@@ -19,13 +19,10 @@
package org.apache.flink.table.planner.expressions
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.table.planner.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.planner.validate.{ValidationFailure, ValidationSuccess}
trait WindowProperty {
- def toNamedWindowProperty(name: String): NamedWindowProperty
-
def resultType: TypeInformation[_]
}
@@ -42,8 +39,6 @@ abstract class AbstractWindowProperty(child: PlannerExpression)
} else {
ValidationFailure("Child must be a window reference.")
}
-
- def toNamedWindowProperty(name: String): NamedWindowProperty = NamedWindowProperty(name, this)
}
case class WindowStart(child: PlannerExpression) extends AbstractWindowProperty(child) {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowAggregate.scala
index 0d70fd7..f3e35e1 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowAggregate.scala
@@ -36,7 +36,7 @@ final class LogicalWindowAggregate(
groupSet: ImmutableBitSet,
aggCalls: util.List[AggregateCall],
window: LogicalWindow,
- namedProperties: Seq[NamedWindowProperty])
+ namedProperties: util.List[NamedWindowProperty])
extends WindowAggregate(cluster, traitSet, child, groupSet, aggCalls, window, namedProperties) {
override def copy(
@@ -55,7 +55,7 @@ final class LogicalWindowAggregate(
namedProperties)
}
- def copy(namedProperties: Seq[NamedWindowProperty]): LogicalWindowAggregate = {
+ def copy(namedProperties: util.List[NamedWindowProperty]): LogicalWindowAggregate = {
new LogicalWindowAggregate(
cluster,
traitSet,
@@ -71,7 +71,7 @@ object LogicalWindowAggregate {
def create(
window: LogicalWindow,
- namedProperties: Seq[NamedWindowProperty],
+ namedProperties: util.List[NamedWindowProperty],
agg: Aggregate): LogicalWindowAggregate = {
require(agg.getGroupType == Group.SIMPLE)
val cluster: RelOptCluster = agg.getCluster
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowTableAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowTableAggregate.scala
index 6ae042f..f9a2ed2 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowTableAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowTableAggregate.scala
@@ -41,7 +41,7 @@ class LogicalWindowTableAggregate(
groupSets: util.List[ImmutableBitSet],
aggCalls: util.List[AggregateCall],
window: LogicalWindow,
- namedProperties: Seq[NamedWindowProperty])
+ namedProperties: util.List[NamedWindowProperty])
extends WindowTableAggregate(
cluster,
traitSet,
@@ -69,7 +69,7 @@ object LogicalWindowTableAggregate {
def create(
window: LogicalWindow,
- namedProperties: Seq[NamedWindowProperty],
+ namedProperties: util.List[NamedWindowProperty],
aggregate: Aggregate): LogicalWindowTableAggregate = {
val cluster: RelOptCluster = aggregate.getCluster
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowAggregate.scala
index 884be0a..c28dd1a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowAggregate.scala
@@ -31,6 +31,9 @@ import org.apache.calcite.util.ImmutableBitSet
import java.util
+import scala.collection.JavaConverters._
+
+
/**
* Relational operator that eliminates duplicates and computes totals with time window group.
*
@@ -43,7 +46,7 @@ abstract class WindowAggregate(
groupSet: ImmutableBitSet,
aggCalls: util.List[AggregateCall],
window: LogicalWindow,
- namedProperties: Seq[NamedWindowProperty])
+ namedProperties: util.List[NamedWindowProperty])
extends Aggregate(
cluster,
traitSet,
@@ -54,7 +57,7 @@ abstract class WindowAggregate(
def getWindow: LogicalWindow = window
- def getNamedProperties: Seq[NamedWindowProperty] = namedProperties
+ def getNamedProperties: util.List[NamedWindowProperty] = namedProperties
override def accept(shuttle: RelShuttle): RelNode = shuttle.visit(this)
@@ -63,7 +66,7 @@ abstract class WindowAggregate(
val typeFactory = getCluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
val builder = typeFactory.builder
builder.addAll(aggregateRowType.getFieldList)
- namedProperties.foreach { namedProp =>
+ namedProperties.asScala.foreach { namedProp =>
builder.add(
namedProp.getName,
typeFactory.createFieldTypeFromLogicalType(namedProp.getProperty.getResultType)
@@ -82,6 +85,6 @@ abstract class WindowAggregate(
override def explainTerms(pw: RelWriter): RelWriter = {
super.explainTerms(pw)
.item("window", window)
- .item("properties", namedProperties.map(_.getName).mkString(", "))
+ .item("properties", namedProperties.asScala.map(_.getName).mkString(", "))
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowTableAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowTableAggregate.scala
index 98bcaea..d388b5b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowTableAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowTableAggregate.scala
@@ -30,6 +30,8 @@ import org.apache.calcite.util.ImmutableBitSet
import java.util
+import scala.collection.JavaConverters._
+
/**
* Relational operator that represents a window table aggregate. A TableAggregate is similar to the
* [[org.apache.calcite.rel.core.Aggregate]] but may output 0 or more records for a group.
@@ -42,19 +44,19 @@ abstract class WindowTableAggregate(
groupSets: util.List[ImmutableBitSet],
aggCalls: util.List[AggregateCall],
window: LogicalWindow,
- namedProperties: Seq[NamedWindowProperty])
+ namedProperties: util.List[NamedWindowProperty])
extends TableAggregate(cluster, traitSet, input, groupSet, groupSets, aggCalls) {
def getWindow: LogicalWindow = window
- def getNamedProperties: Seq[NamedWindowProperty] = namedProperties
+ def getNamedProperties: util.List[NamedWindowProperty] = namedProperties
override def deriveRowType(): RelDataType = {
val aggregateRowType = super.deriveRowType()
val typeFactory = getCluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
val builder = typeFactory.builder
builder.addAll(aggregateRowType.getFieldList)
- namedProperties.foreach { namedProp =>
+ namedProperties.asScala.foreach { namedProp =>
builder.add(
namedProp.getName,
typeFactory.createFieldTypeFromLogicalType(namedProp.getProperty.getResultType)
@@ -66,6 +68,6 @@ abstract class WindowTableAggregate(
override def explainTerms(pw: RelWriter): RelWriter = {
super.explainTerms(pw)
.item("window", window)
- .item("properties", namedProperties.map(_.getName).mkString(", "))
+ .item("properties", namedProperties.asScala.map(_.getName).mkString(", "))
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
index 8cef117..7884f59 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
@@ -43,7 +43,7 @@ class FlinkLogicalWindowAggregate(
groupSet: ImmutableBitSet,
aggCalls: util.List[AggregateCall],
window: LogicalWindow,
- namedProperties: Seq[NamedWindowProperty])
+ namedProperties: util.List[NamedWindowProperty])
extends WindowAggregate(cluster, traitSet, child, groupSet, aggCalls, window, namedProperties)
with FlinkLogicalRel {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
index add5d41..7dcc81b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
@@ -44,7 +44,7 @@ class FlinkLogicalWindowTableAggregate(
groupSets: util.List[ImmutableBitSet],
aggCalls: util.List[AggregateCall],
window: LogicalWindow,
- namedProperties: Seq[NamedWindowProperty])
+ namedProperties: util.List[NamedWindowProperty])
extends WindowTableAggregate(
cluster,
traitSet,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala
index b2d0797..e8013a8 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala
@@ -175,7 +175,7 @@ class CorrelateSortToRankRule extends RelOptRule(
1,
sort.fetch.asInstanceOf[RexLiteral].getValueAs(classOf[java.lang.Long])),
null,
- outputRankNumber = false)
+ false)
.project(projects)
.build()