You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/03/23 13:18:13 UTC

[flink] 01/02: [FLINK-26518][table-planner] Port FlinkRelBuilder to Java

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1e1b182e702ec25a38c3c02d585734415eae110a
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Mar 9 12:14:46 2022 +0100

    [FLINK-26518][table-planner] Port FlinkRelBuilder to Java
---
 .../table/planner/calcite/FlinkRelBuilder.java     | 235 +++++++++++++++++++++
 .../catalog/QueryOperationCatalogViewTable.java    |   8 +-
 .../table/planner/delegation/PlannerContext.java   |   2 +-
 .../plan/rules/logical/SubQueryDecorrelator.java   |   3 +-
 .../BatchPhysicalPythonWindowAggregateRule.java    |   3 +-
 ...reamPhysicalPythonGroupWindowAggregateRule.java |   2 +-
 .../table/planner/calcite/FlinkRelBuilder.scala    | 233 --------------------
 .../planner/expressions/fieldExpression.scala      |   7 -
 .../planner/expressions/windowProperties.scala     |   5 -
 .../nodes/calcite/LogicalWindowAggregate.scala     |   6 +-
 .../calcite/LogicalWindowTableAggregate.scala      |   4 +-
 .../plan/nodes/calcite/WindowAggregate.scala       |  11 +-
 .../plan/nodes/calcite/WindowTableAggregate.scala  |  10 +-
 .../logical/FlinkLogicalWindowAggregate.scala      |   2 +-
 .../logical/FlinkLogicalWindowTableAggregate.scala |   2 +-
 .../rules/logical/CorrelateSortToRankRule.scala    |   2 +-
 16 files changed, 267 insertions(+), 268 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
new file mode 100644
index 0000000..35ab473
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.calcite;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.planner.calcite.FlinkRelFactories.ExpandFactory;
+import org.apache.flink.table.planner.calcite.FlinkRelFactories.RankFactory;
+import org.apache.flink.table.planner.hint.FlinkHints;
+import org.apache.flink.table.planner.plan.QueryOperationConverter;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalTableAggregate;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowTableAggregate;
+import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
+import org.apache.flink.table.runtime.operators.rank.RankRange;
+import org.apache.flink.table.runtime.operators.rank.RankType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptSchema;
+import org.apache.calcite.plan.RelOptTable.ToRelContext;
+import org.apache.calcite.plan.ViewExpanders;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.UnaryOperator;
+
+import static org.apache.flink.table.planner.plan.utils.AggregateUtil.isTableAggregate;
+
+/** Flink-specific {@link RelBuilder}. */
+@Internal
+public final class FlinkRelBuilder extends RelBuilder {
+
+    private final QueryOperationConverter toRelNodeConverter;
+
+    private final ExpandFactory expandFactory;
+
+    private final RankFactory rankFactory;
+
+    private FlinkRelBuilder(Context context, RelOptCluster cluster, RelOptSchema relOptSchema) {
+        super(context, cluster, relOptSchema);
+
+        this.toRelNodeConverter =
+                new QueryOperationConverter(this, context.unwrap(FlinkContext.class).isBatchMode());
+        this.expandFactory =
+                Util.first(
+                        context.unwrap(ExpandFactory.class),
+                        FlinkRelFactories.DEFAULT_EXPAND_FACTORY());
+        this.rankFactory =
+                Util.first(
+                        context.unwrap(RankFactory.class),
+                        FlinkRelFactories.DEFAULT_RANK_FACTORY());
+    }
+
+    public static FlinkRelBuilder of(
+            Context context, RelOptCluster cluster, RelOptSchema relOptSchema) {
+        return new FlinkRelBuilder(Preconditions.checkNotNull(context), cluster, relOptSchema);
+    }
+
+    public static FlinkRelBuilder of(RelOptCluster cluster, RelOptSchema relOptSchema) {
+        return FlinkRelBuilder.of(cluster.getPlanner().getContext(), cluster, relOptSchema);
+    }
+
+    public static RelBuilderFactory proto(Context context) {
+        return (cluster, schema) -> {
+            final Context clusterContext = cluster.getPlanner().getContext();
+            final Context chain = Contexts.chain(context, clusterContext);
+            return FlinkRelBuilder.of(chain, cluster, schema);
+        };
+    }
+
+    public RelBuilder expand(List<List<RexNode>> projects, int expandIdIndex) {
+        final RelNode input = build();
+        final RelNode expand = expandFactory.createExpand(input, projects, expandIdIndex);
+        return push(expand);
+    }
+
+    public RelBuilder rank(
+            ImmutableBitSet partitionKey,
+            RelCollation orderKey,
+            RankType rankType,
+            RankRange rankRange,
+            RelDataTypeField rankNumberType,
+            boolean outputRankNumber) {
+        final RelNode input = build();
+        final RelNode rank =
+                rankFactory.createRank(
+                        input,
+                        partitionKey,
+                        orderKey,
+                        rankType,
+                        rankRange,
+                        rankNumberType,
+                        outputRankNumber);
+        return push(rank);
+    }
+
+    /** Build non-window aggregate for either aggregate or table aggregate. */
+    @Override
+    public RelBuilder aggregate(
+            RelBuilder.GroupKey groupKey, Iterable<RelBuilder.AggCall> aggCalls) {
+        // build a relNode, the build() may also return a project
+        RelNode relNode = super.aggregate(groupKey, aggCalls).build();
+
+        if (relNode instanceof LogicalAggregate) {
+            final LogicalAggregate logicalAggregate = (LogicalAggregate) relNode;
+            if (isTableAggregate(logicalAggregate.getAggCallList())) {
+                relNode = LogicalTableAggregate.create(logicalAggregate);
+            } else if (isCountStarAgg(logicalAggregate)) {
+                final RelNode newAggInput =
+                        push(logicalAggregate.getInput(0)).project(literal(0)).build();
+                relNode =
+                        logicalAggregate.copy(
+                                logicalAggregate.getTraitSet(), ImmutableList.of(newAggInput));
+            }
+        }
+
+        return push(relNode);
+    }
+
+    /** Build window aggregate for either aggregate or table aggregate. */
+    public RelBuilder windowAggregate(
+            LogicalWindow window,
+            GroupKey groupKey,
+            List<NamedWindowProperty> namedProperties,
+            Iterable<AggCall> aggCalls) {
+        // build logical aggregate
+
+        // Because of:
+        // [CALCITE-3763] RelBuilder.aggregate should prune unused fields from the input,
+        // if the input is a Project.
+        //
+        // the field can not be pruned if it is referenced by other expressions
+        // of the window aggregation(i.e. the TUMBLE_START/END).
+        // To solve this, we config the RelBuilder to forbidden this feature.
+        final LogicalAggregate aggregate =
+                (LogicalAggregate)
+                        super.transform(t -> t.withPruneInputOfAggregate(false))
+                                .push(build())
+                                .aggregate(groupKey, aggCalls)
+                                .build();
+
+        // build logical window aggregate from it
+        final RelNode windowAggregate;
+        if (isTableAggregate(aggregate.getAggCallList())) {
+            windowAggregate =
+                    LogicalWindowTableAggregate.create(window, namedProperties, aggregate);
+        } else {
+            windowAggregate = LogicalWindowAggregate.create(window, namedProperties, aggregate);
+        }
+        return push(windowAggregate);
+    }
+
+    /** Build watermark assigner relational node. */
+    public RelBuilder watermark(int rowtimeFieldIndex, RexNode watermarkExpr) {
+        final RelNode input = build();
+        final RelNode relNode =
+                LogicalWatermarkAssigner.create(cluster, input, rowtimeFieldIndex, watermarkExpr);
+        return push(relNode);
+    }
+
+    public RelBuilder queryOperation(QueryOperation queryOperation) {
+        final RelNode relNode = queryOperation.accept(toRelNodeConverter);
+        return push(relNode);
+    }
+
+    public RelBuilder scan(ObjectIdentifier identifier, Map<String, String> dynamicOptions) {
+        final List<RelHint> hints = new ArrayList<>();
+        hints.add(
+                RelHint.builder(FlinkHints.HINT_NAME_OPTIONS).hintOptions(dynamicOptions).build());
+        final ToRelContext toRelContext = ViewExpanders.simpleContext(cluster, hints);
+        final RelNode relNode =
+                relOptSchema.getTableForMember(identifier.toList()).toRel(toRelContext);
+        return push(relNode);
+    }
+
+    @Override
+    public FlinkTypeFactory getTypeFactory() {
+        return (FlinkTypeFactory) super.getTypeFactory();
+    }
+
+    @Override
+    public RelBuilder transform(UnaryOperator<Config> transform) {
+        // Override in order to return a FlinkRelBuilder.
+        final Context mergedContext =
+                Contexts.of(transform.apply(Config.DEFAULT), cluster.getPlanner().getContext());
+        return FlinkRelBuilder.of(mergedContext, cluster, relOptSchema);
+    }
+
+    private static boolean isCountStarAgg(LogicalAggregate agg) {
+        if (agg.getGroupCount() != 0 || agg.getAggCallList().size() != 1) {
+            return false;
+        }
+        final AggregateCall call = agg.getAggCallList().get(0);
+        return call.getAggregation().getKind() == SqlKind.COUNT
+                && call.filterArg == -1
+                && call.getArgList().isEmpty();
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
index 9ba8e6e..32353c1 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
@@ -25,6 +25,9 @@ import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable;
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
 
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptSchema;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelNode;
@@ -70,8 +73,9 @@ public class QueryOperationCatalogViewTable extends ExpandingPreparingTable {
 
     @Override
     public RelNode convertToRel(RelOptTable.ToRelContext context) {
-        FlinkRelBuilder relBuilder =
-                FlinkRelBuilder.of(context, context.getCluster(), this.getRelOptSchema());
+        final RelOptCluster cluster = context.getCluster();
+        final Context chain = Contexts.of(context, cluster.getPlanner().getContext());
+        final FlinkRelBuilder relBuilder = FlinkRelBuilder.of(chain, cluster, getRelOptSchema());
 
         return relBuilder.queryOperation(catalogView.getQueryOperation()).build();
     }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
index 19a4b6e..e98e582 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
@@ -184,7 +184,7 @@ public class PlannerContext {
                         context,
                         // Sets up the ViewExpander explicitly for FlinkRelBuilder.
                         createFlinkPlanner(currentCatalog, currentDatabase).createToRelContext());
-        return new FlinkRelBuilder(chain, cluster, relOptSchema);
+        return FlinkRelBuilder.of(chain, cluster, relOptSchema);
     }
 
     /**
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java
index b80b884..828286b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java
@@ -134,8 +134,7 @@ public class SubQueryDecorrelator extends RelShuttleImpl {
         }
 
         RelOptCluster cluster = rootRel.getCluster();
-        RelBuilder relBuilder =
-                new FlinkRelBuilder(cluster.getPlanner().getContext(), cluster, null);
+        RelBuilder relBuilder = FlinkRelBuilder.of(cluster, null);
         RexBuilder rexBuilder = cluster.getRexBuilder();
 
         final SubQueryDecorrelator decorrelator =
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java
index 9eadb76..847b803 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
 import org.apache.flink.table.planner.plan.utils.AggregateUtil;
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
 import org.apache.flink.table.planner.plan.utils.PythonUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.types.DataType;
 
 import org.apache.calcite.plan.RelOptRule;
@@ -160,7 +161,7 @@ public class BatchPhysicalPythonWindowAggregateRule extends RelOptRule {
                         window,
                         inputTimeFieldIndex,
                         inputTimeIsDate,
-                        agg.getNamedProperties());
+                        JavaScalaConversionUtil.toScala(agg.getNamedProperties()));
         call.transformTo(windowAgg);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
index 002f1ae..5f080d7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
@@ -143,7 +143,7 @@ public class StreamPhysicalPythonGroupWindowAggregateRule extends ConverterRule
                 agg.getGroupSet().toArray(),
                 JavaScalaConversionUtil.toScala(aggCalls),
                 agg.getWindow(),
-                agg.getNamedProperties(),
+                JavaScalaConversionUtil.toScala(agg.getNamedProperties()),
                 emitStrategy);
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala
deleted file mode 100644
index b96e510..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.calcite
-
-import org.apache.flink.table.operations.QueryOperation
-import org.apache.flink.table.planner.calcite.FlinkRelFactories.{ExpandFactory, RankFactory}
-import org.apache.flink.table.planner.expressions.WindowProperty
-import org.apache.flink.table.planner.plan.QueryOperationConverter
-import org.apache.flink.table.planner.plan.logical.LogicalWindow
-import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalTableAggregate, LogicalWatermarkAssigner, LogicalWindowAggregate, LogicalWindowTableAggregate}
-import org.apache.flink.table.planner.plan.utils.AggregateUtil
-import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty
-import org.apache.flink.table.runtime.operators.rank.{RankRange, RankType}
-
-import com.google.common.collect.ImmutableList
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.RelCollation
-import org.apache.calcite.rel.`type`.RelDataTypeField
-import org.apache.calcite.rel.hint.RelHint
-import org.apache.calcite.rel.logical.LogicalAggregate
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.SqlKind
-import org.apache.calcite.tools.RelBuilder.{AggCall, Config, GroupKey}
-import org.apache.calcite.tools.{RelBuilder, RelBuilderFactory}
-import org.apache.calcite.util.{ImmutableBitSet, Util}
-import org.apache.flink.table.catalog.ObjectIdentifier
-import org.apache.flink.table.planner.hint.FlinkHints
-
-import java.lang.Iterable
-import java.util
-import java.util.List
-import java.util.function.UnaryOperator
-
-import scala.collection.JavaConversions._
-
-/**
-  * Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]].
-  */
-class FlinkRelBuilder(
-    context: Context,
-    relOptCluster: RelOptCluster,
-    relOptSchema: RelOptSchema)
-  extends RelBuilder(
-    context,
-    relOptCluster,
-    relOptSchema) {
-
-  require(context != null)
-
-  private val toRelNodeConverter = {
-    new QueryOperationConverter(this, context.unwrap(classOf[FlinkContext]).isBatchMode)
-  }
-
-  private val expandFactory: ExpandFactory = {
-    Util.first(context.unwrap(classOf[ExpandFactory]), FlinkRelFactories.DEFAULT_EXPAND_FACTORY)
-  }
-
-  private val rankFactory: RankFactory = {
-    Util.first(context.unwrap(classOf[RankFactory]), FlinkRelFactories.DEFAULT_RANK_FACTORY)
-  }
-
-  override def getRelOptSchema: RelOptSchema = relOptSchema
-
-  override def getCluster: RelOptCluster = relOptCluster
-
-  override def getTypeFactory: FlinkTypeFactory =
-    super.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-
-  override def transform(transform: UnaryOperator[RelBuilder.Config]): FlinkRelBuilder = {
-    // Override in order to return a FlinkRelBuilder.
-    FlinkRelBuilder.of(transform.apply(Config.DEFAULT), cluster, relOptSchema)
-  }
-
-  def expand(
-      projects: util.List[util.List[RexNode]],
-      expandIdIndex: Int): RelBuilder = {
-    val input = build()
-    val expand = expandFactory.createExpand(input, projects, expandIdIndex)
-    push(expand)
-  }
-
-  def rank(
-      partitionKey: ImmutableBitSet,
-      orderKey: RelCollation,
-      rankType: RankType,
-      rankRange: RankRange,
-      rankNumberType: RelDataTypeField,
-      outputRankNumber: Boolean): RelBuilder = {
-    val input = build()
-    val rank = rankFactory.createRank(input, partitionKey, orderKey, rankType, rankRange,
-      rankNumberType, outputRankNumber)
-    push(rank)
-  }
-
-  /**
-    * Build non-window aggregate for either aggregate or table aggregate.
-    */
-  override def aggregate(groupKey: GroupKey, aggCalls: Iterable[AggCall]): RelBuilder = {
-    // build a relNode, the build() may also return a project
-    val relNode = super.aggregate(groupKey, aggCalls).build()
-
-    def isCountStartAgg(agg: LogicalAggregate): Boolean = {
-      if (agg.getGroupCount != 0 || agg.getAggCallList.size() != 1) {
-        return false
-      }
-      val call = agg.getAggCallList.head
-      call.getAggregation.getKind == SqlKind.COUNT &&
-          call.filterArg == -1 && call.getArgList.isEmpty
-    }
-
-    relNode match {
-      case logicalAggregate: LogicalAggregate
-        if AggregateUtil.isTableAggregate(logicalAggregate.getAggCallList) =>
-        push(LogicalTableAggregate.create(logicalAggregate))
-      case logicalAggregate2: LogicalAggregate
-        if isCountStartAgg(logicalAggregate2) =>
-        val newAggInput = push(logicalAggregate2.getInput(0))
-            .project(literal(0)).build()
-        push(logicalAggregate2.copy(logicalAggregate2.getTraitSet, ImmutableList.of(newAggInput)))
-      case _ => push(relNode)
-    }
-  }
-
-  /**
-    * Build window aggregate for either aggregate or table aggregate.
-    */
-  def windowAggregate(
-      window: LogicalWindow,
-      groupKey: GroupKey,
-      namedProperties: List[NamedWindowProperty],
-      aggCalls: Iterable[AggCall]): RelBuilder = {
-    // build logical aggregate
-
-    // Because of:
-    // [CALCITE-3763] RelBuilder.aggregate should prune unused fields from the input,
-    // if the input is a Project.
-    //
-    // the field can not be pruned if it is referenced by other expressions
-    // of the window aggregation(i.e. the TUMBLE_START/END).
-    // To solve this, we config the RelBuilder to forbidden this feature.
-    val aggregate = super.transform(
-      new UnaryOperator[RelBuilder.Config] {
-        override def apply(t: RelBuilder.Config)
-          : RelBuilder.Config = t.withPruneInputOfAggregate(false)
-      })
-      .push(build())
-      .aggregate(groupKey, aggCalls)
-      .build()
-      .asInstanceOf[LogicalAggregate]
-
-    // build logical window aggregate from it
-    aggregate match {
-      case logicalAggregate: LogicalAggregate
-        if AggregateUtil.isTableAggregate(logicalAggregate.getAggCallList) =>
-        push(LogicalWindowTableAggregate.create(window, namedProperties, aggregate))
-      case _ => push(LogicalWindowAggregate.create(window, namedProperties, aggregate))
-    }
-  }
-
-  /**
-    * Build watermark assigner relation node.
-    */
-  def watermark(rowtimeFieldIndex: Int, watermarkExpr: RexNode): RelBuilder = {
-    val input = build()
-    val watermarkAssigner = LogicalWatermarkAssigner
-      .create(cluster, input, rowtimeFieldIndex, watermarkExpr)
-    push(watermarkAssigner)
-    this
-  }
-
-  def queryOperation(queryOperation: QueryOperation): RelBuilder = {
-    val relNode = queryOperation.accept(toRelNodeConverter)
-    push(relNode)
-    this
-  }
-
-  def scan(
-      identifier: ObjectIdentifier,
-      dynamicOptions: util.Map[String, String]): RelBuilder = {
-    val hints = new util.ArrayList[RelHint]
-    hints.add(RelHint.builder(FlinkHints.HINT_NAME_OPTIONS).hintOptions(dynamicOptions).build)
-    val toRelContext = ViewExpanders.simpleContext(cluster, hints)
-    push(relOptSchema.getTableForMember(identifier.toList).toRel(toRelContext))
-    this
-  }
-}
-
-object FlinkRelBuilder {
-
-  case class NamedWindowProperty(name: String, property: WindowProperty)
-
-  def proto(context: Context): RelBuilderFactory = new RelBuilderFactory() {
-    def create(cluster: RelOptCluster, schema: RelOptSchema): RelBuilder = {
-      val clusterContext = cluster.getPlanner.getContext.unwrap(classOf[FlinkContext])
-      val mergedContext = Contexts.chain(context, clusterContext)
-
-      new FlinkRelBuilder(mergedContext, cluster, schema)
-    }
-  }
-
-  def of(cluster: RelOptCluster, relOptSchema: RelOptSchema): FlinkRelBuilder = {
-    val clusterContext = cluster.getPlanner.getContext
-    new FlinkRelBuilder(
-      clusterContext,
-      cluster,
-      relOptSchema)
-  }
-
-  def of(contextVar: Object, cluster: RelOptCluster, relOptSchema: RelOptSchema)
-    : FlinkRelBuilder = {
-    val mergedContext = Contexts.of(contextVar, cluster.getPlanner.getContext)
-    new FlinkRelBuilder(
-      mergedContext,
-      cluster,
-      relOptSchema)
-  }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
index fd86f67..013e687 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.expressions
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api._
 import org.apache.flink.table.operations.QueryOperation
-import org.apache.flink.table.planner.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory._
 import org.apache.flink.table.planner.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
@@ -150,9 +149,6 @@ case class RowtimeAttribute(expr: PlannerExpression) extends TimeAttribute(expr)
     }
   }
 
-  override def toNamedWindowProperty(name: String): NamedWindowProperty =
-    NamedWindowProperty(name, this)
-
   override def toString: String = s"rowtime($child)"
 }
 
@@ -174,9 +170,6 @@ case class ProctimeAttribute(expr: PlannerExpression) extends TimeAttribute(expr
   override def resultType: TypeInformation[_] =
     TimeIndicatorTypeInfo.PROCTIME_INDICATOR
 
-  override def toNamedWindowProperty(name: String): NamedWindowProperty =
-    NamedWindowProperty(name, this)
-
   override def toString: String = s"proctime($child)"
 }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/windowProperties.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/windowProperties.scala
index 0e68163..ce6940a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/windowProperties.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/windowProperties.scala
@@ -19,13 +19,10 @@
 package org.apache.flink.table.planner.expressions
 
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.table.planner.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.planner.validate.{ValidationFailure, ValidationSuccess}
 
 trait WindowProperty {
 
-  def toNamedWindowProperty(name: String): NamedWindowProperty
-
   def resultType: TypeInformation[_]
 
 }
@@ -42,8 +39,6 @@ abstract class AbstractWindowProperty(child: PlannerExpression)
     } else {
       ValidationFailure("Child must be a window reference.")
     }
-
-  def toNamedWindowProperty(name: String): NamedWindowProperty = NamedWindowProperty(name, this)
 }
 
 case class WindowStart(child: PlannerExpression) extends AbstractWindowProperty(child) {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowAggregate.scala
index 0d70fd7..f3e35e1 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowAggregate.scala
@@ -36,7 +36,7 @@ final class LogicalWindowAggregate(
     groupSet: ImmutableBitSet,
     aggCalls: util.List[AggregateCall],
     window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty])
+    namedProperties: util.List[NamedWindowProperty])
   extends WindowAggregate(cluster, traitSet, child, groupSet, aggCalls, window, namedProperties) {
 
   override def copy(
@@ -55,7 +55,7 @@ final class LogicalWindowAggregate(
       namedProperties)
   }
 
-  def copy(namedProperties: Seq[NamedWindowProperty]): LogicalWindowAggregate = {
+  def copy(namedProperties: util.List[NamedWindowProperty]): LogicalWindowAggregate = {
     new LogicalWindowAggregate(
       cluster,
       traitSet,
@@ -71,7 +71,7 @@ object LogicalWindowAggregate {
 
   def create(
       window: LogicalWindow,
-      namedProperties: Seq[NamedWindowProperty],
+      namedProperties: util.List[NamedWindowProperty],
       agg: Aggregate): LogicalWindowAggregate = {
     require(agg.getGroupType == Group.SIMPLE)
     val cluster: RelOptCluster = agg.getCluster
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowTableAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowTableAggregate.scala
index 6ae042f..f9a2ed2 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowTableAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowTableAggregate.scala
@@ -41,7 +41,7 @@ class LogicalWindowTableAggregate(
     groupSets: util.List[ImmutableBitSet],
     aggCalls: util.List[AggregateCall],
     window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty])
+    namedProperties: util.List[NamedWindowProperty])
   extends WindowTableAggregate(
     cluster,
     traitSet,
@@ -69,7 +69,7 @@ object LogicalWindowTableAggregate {
 
   def create(
     window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty],
+    namedProperties: util.List[NamedWindowProperty],
     aggregate: Aggregate): LogicalWindowTableAggregate = {
 
     val cluster: RelOptCluster = aggregate.getCluster
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowAggregate.scala
index 884be0a..c28dd1a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowAggregate.scala
@@ -31,6 +31,9 @@ import org.apache.calcite.util.ImmutableBitSet
 
 import java.util
 
+import scala.collection.JavaConverters._
+
+
 /**
   * Relational operator that eliminates duplicates and computes totals with time window group.
   *
@@ -43,7 +46,7 @@ abstract class WindowAggregate(
     groupSet: ImmutableBitSet,
     aggCalls: util.List[AggregateCall],
     window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty])
+    namedProperties: util.List[NamedWindowProperty])
   extends Aggregate(
     cluster,
     traitSet,
@@ -54,7 +57,7 @@ abstract class WindowAggregate(
 
   def getWindow: LogicalWindow = window
 
-  def getNamedProperties: Seq[NamedWindowProperty] = namedProperties
+  def getNamedProperties: util.List[NamedWindowProperty] = namedProperties
 
   override def accept(shuttle: RelShuttle): RelNode = shuttle.visit(this)
 
@@ -63,7 +66,7 @@ abstract class WindowAggregate(
     val typeFactory = getCluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
     val builder = typeFactory.builder
     builder.addAll(aggregateRowType.getFieldList)
-    namedProperties.foreach { namedProp =>
+    namedProperties.asScala.foreach { namedProp =>
       builder.add(
         namedProp.getName,
         typeFactory.createFieldTypeFromLogicalType(namedProp.getProperty.getResultType)
@@ -82,6 +85,6 @@ abstract class WindowAggregate(
   override def explainTerms(pw: RelWriter): RelWriter = {
     super.explainTerms(pw)
       .item("window", window)
-      .item("properties", namedProperties.map(_.getName).mkString(", "))
+      .item("properties", namedProperties.asScala.map(_.getName).mkString(", "))
   }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowTableAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowTableAggregate.scala
index 98bcaea..d388b5b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowTableAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowTableAggregate.scala
@@ -30,6 +30,8 @@ import org.apache.calcite.util.ImmutableBitSet
 
 import java.util
 
+import scala.collection.JavaConverters._
+
 /**
   * Relational operator that represents a window table aggregate. A TableAggregate is similar to the
   * [[org.apache.calcite.rel.core.Aggregate]] but may output 0 or more records for a group.
@@ -42,19 +44,19 @@ abstract class WindowTableAggregate(
     groupSets: util.List[ImmutableBitSet],
     aggCalls: util.List[AggregateCall],
     window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty])
+    namedProperties: util.List[NamedWindowProperty])
   extends TableAggregate(cluster, traitSet, input, groupSet, groupSets, aggCalls) {
 
   def getWindow: LogicalWindow = window
 
-  def getNamedProperties: Seq[NamedWindowProperty] = namedProperties
+  def getNamedProperties: util.List[NamedWindowProperty] = namedProperties
 
   override def deriveRowType(): RelDataType = {
     val aggregateRowType = super.deriveRowType()
     val typeFactory = getCluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
     val builder = typeFactory.builder
     builder.addAll(aggregateRowType.getFieldList)
-    namedProperties.foreach { namedProp =>
+    namedProperties.asScala.foreach { namedProp =>
       builder.add(
         namedProp.getName,
         typeFactory.createFieldTypeFromLogicalType(namedProp.getProperty.getResultType)
@@ -66,6 +68,6 @@ abstract class WindowTableAggregate(
   override def explainTerms(pw: RelWriter): RelWriter = {
     super.explainTerms(pw)
       .item("window", window)
-      .item("properties", namedProperties.map(_.getName).mkString(", "))
+      .item("properties", namedProperties.asScala.map(_.getName).mkString(", "))
   }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
index 8cef117..7884f59 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
@@ -43,7 +43,7 @@ class FlinkLogicalWindowAggregate(
     groupSet: ImmutableBitSet,
     aggCalls: util.List[AggregateCall],
     window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty])
+    namedProperties: util.List[NamedWindowProperty])
   extends WindowAggregate(cluster, traitSet, child, groupSet, aggCalls, window, namedProperties)
   with FlinkLogicalRel {
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
index add5d41..7dcc81b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
@@ -44,7 +44,7 @@ class FlinkLogicalWindowTableAggregate(
     groupSets: util.List[ImmutableBitSet],
     aggCalls: util.List[AggregateCall],
     window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty])
+    namedProperties: util.List[NamedWindowProperty])
   extends WindowTableAggregate(
     cluster,
     traitSet,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala
index b2d0797..e8013a8 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala
@@ -175,7 +175,7 @@ class CorrelateSortToRankRule extends RelOptRule(
           1,
           sort.fetch.asInstanceOf[RexLiteral].getValueAs(classOf[java.lang.Long])),
         null,
-        outputRankNumber = false)
+        false)
       .project(projects)
       .build()