You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/31 02:15:19 UTC

[flink] 05/13: [FLINK-14338][table-planner][table-planner-blink] Tweak implementations due to API change

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7dad5c47c92db305dc39f58acb1f8cf88b6eaed1
Author: yuzhao.cyz <yu...@gmail.com>
AuthorDate: Tue Mar 17 20:36:40 2020 +0800

    [FLINK-14338][table-planner][table-planner-blink] Tweak implementations due to API change
    
    * Replace RelOptUtils.createCastRel with RelOptUtil.createCastRel
    * Implement RelOptTable#getKeys and Statistic#getKeys
    * Changes logical nodes constructor for hints
    * Implement RelShuttle.visit(LogicalTableModify)
---
 .../table/planner/catalog/SqlCatalogViewTable.java |  4 +-
 .../plan/schema/FlinkPreparingTableBase.java       |  5 ++
 .../table/planner/plan/utils/RelOptUtils.java      | 85 ----------------------
 .../planner/calcite/FlinkLogicalRelFactories.scala | 17 ++++-
 .../table/planner/calcite/FlinkPlannerImpl.scala   |  4 +
 .../table/planner/calcite/FlinkRelOptCluster.scala | 77 --------------------
 .../calcite/RelTimeIndicatorConverter.scala        |  6 ++
 .../planner/plan/nodes/common/CommonCalc.scala     |  6 +-
 .../plan/nodes/logical/FlinkLogicalAggregate.scala |  8 +-
 .../logical/FlinkLogicalDataStreamTableScan.scala  |  4 +-
 .../plan/nodes/logical/FlinkLogicalJoin.scala      |  6 +-
 .../logical/FlinkLogicalTableSourceScan.scala      |  4 +-
 .../table/planner/plan/stats/FlinkStatistic.scala  |  6 +-
 .../table/planner/plan/utils/AggregateUtil.scala   |  4 -
 .../table/planner/plan/utils/RelShuttles.scala     |  2 +
 .../flink/table/planner/sinks/TableSinkUtils.scala |  7 +-
 .../planner/plan/common/ViewsExpandingTest.scala   |  1 -
 .../metadata/AggCallSelectivityEstimatorTest.scala | 10 +--
 .../plan/metadata/SelectivityEstimatorTest.scala   |  7 +-
 .../optimize/program/FlinkChainedProgramTest.scala |  7 +-
 .../table/calcite/RelTimeIndicatorConverter.scala  |  7 ++
 .../flink/table/plan/stats/FlinkStatistic.scala    |  2 +
 22 files changed, 81 insertions(+), 198 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java
index a9f56e1..cf2fb35 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java
@@ -21,9 +21,9 @@ package org.apache.flink.table.planner.catalog;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable;
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
-import org.apache.flink.table.planner.plan.utils.RelOptUtils;
 
 import org.apache.calcite.plan.RelOptSchema;
+import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
 
@@ -57,6 +57,6 @@ public class SqlCatalogViewTable extends ExpandingPreparingTable {
 		RelNode original = context
 				.expandView(rowType, view.getExpandedQuery(), viewPath, names)
 				.project();
-		return RelOptUtils.createCastRel(original, rowType);
+		return RelOptUtil.createCastRel(original, rowType, true);
 	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/FlinkPreparingTableBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/FlinkPreparingTableBase.java
index ef4ff37..98d1711 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/FlinkPreparingTableBase.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/FlinkPreparingTableBase.java
@@ -268,6 +268,11 @@ public abstract class FlinkPreparingTableBase extends Prepare.AbstractPreparingT
 		}
 	}
 
+	@Override
+	public List<ImmutableBitSet> getKeys() {
+		return statistic.getKeys();
+	}
+
 	/**
 	 * Returns unique keySets of current table.
 	 */
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelOptUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelOptUtils.java
deleted file mode 100644
index d3dfb3e..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelOptUtils.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.utils;
-
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexUtil;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * <code>RelOptUtils</code> defines static utility methods for use in optimizing
- * {@link RelNode}s.
- *
- * <p>This is an extension of {@link org.apache.calcite.plan.RelOptUtil}.
- */
-public class RelOptUtils {
-	/**
-	 * Creates a projection which casts a rel's output to a desired row type.
-	 *
-	 * <p>This method is inspired by {@link RelOptUtil#createCastRel}, different with that,
-	 * we do not generate another {@link Project} if the {@code rel} is already a {@link Project}.
-	 *
-	 * @param rel Producer of rows to be converted
-	 * @param castRowType Row type after cast
-	 * @return Conversion rel with castRowType
-	 */
-	public static RelNode createCastRel(RelNode rel, RelDataType castRowType) {
-		RelFactories.ProjectFactory projectFactory = RelFactories.DEFAULT_PROJECT_FACTORY;
-		final RelDataType oriRowType = rel.getRowType();
-		if (RelOptUtil.areRowTypesEqual(oriRowType, castRowType, true)) {
-			// nothing to do
-			return rel;
-		}
-		final RexBuilder rexBuilder = rel.getCluster().getRexBuilder();
-
-		final List<RelDataTypeField> fieldList = oriRowType.getFieldList();
-		int n = fieldList.size();
-		assert n == castRowType.getFieldCount()
-			: "field count: lhs [" + castRowType + "] rhs [" + oriRowType + "]";
-
-		final List<RexNode> rhsExps;
-		final RelNode input;
-		if (rel instanceof Project) {
-			rhsExps = ((Project) rel).getProjects();
-			// Avoid to generate redundant project node.
-			input = rel.getInput(0);
-		} else {
-			rhsExps = new ArrayList<>();
-			for (RelDataTypeField field : fieldList) {
-				rhsExps.add(rexBuilder.makeInputRef(field.getType(), field.getIndex()));
-			}
-			input = rel;
-		}
-
-		final List<RexNode> castExps =
-				RexUtil.generateCastExpressions(rexBuilder, castRowType, rhsExps);
-		// Use names and types from castRowType.
-		return projectFactory.createProject(input, castExps,
-				castRowType.getFieldNames());
-	}
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala
index 99260d3..56ddbf0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala
@@ -25,10 +25,12 @@ import org.apache.flink.table.runtime.operators.rank.{RankRange, RankType}
 import org.apache.flink.table.sinks.TableSink
 
 import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptTable.ToRelContext
 import org.apache.calcite.plan.{Contexts, RelOptCluster, RelOptTable}
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
 import org.apache.calcite.rel.core.RelFactories._
 import org.apache.calcite.rel.core._
+import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rel.logical._
 import org.apache.calcite.rel.{RelCollation, RelNode}
 import org.apache.calcite.rex._
@@ -82,6 +84,7 @@ object FlinkLogicalRelFactories {
   class ProjectFactoryImpl extends ProjectFactory {
     def createProject(
         input: RelNode,
+        hints: util.List[RelHint],
         childExprs: util.List[_ <: RexNode],
         fieldNames: util.List[String]): RelNode = {
       val rexBuilder = input.getCluster.getRexBuilder
@@ -134,6 +137,7 @@ object FlinkLogicalRelFactories {
   class AggregateFactoryImpl extends AggregateFactory {
     def createAggregate(
         input: RelNode,
+        hints: util.List[RelHint],
         groupSet: ImmutableBitSet,
         groupSets: ImmutableList[ImmutableBitSet],
         aggCalls: util.List[AggregateCall]): RelNode = {
@@ -168,6 +172,7 @@ object FlinkLogicalRelFactories {
     def createJoin(
         left: RelNode,
         right: RelNode,
+        hints: util.List[RelHint],
         condition: RexNode,
         variablesSet: util.Set[CorrelationId],
         joinType: JoinRelType,
@@ -208,14 +213,18 @@ object FlinkLogicalRelFactories {
     * [[FlinkLogicalTableSourceScan]] or [[FlinkLogicalDataStreamTableScan]].
     */
   class TableScanFactoryImpl extends TableScanFactory {
-    def createScan(cluster: RelOptCluster, table: RelOptTable): RelNode = {
-      val tableScan = LogicalTableScan.create(cluster, table)
+    def createScan(toRelContext: ToRelContext, table: RelOptTable): RelNode = {
+      val cluster = toRelContext.getCluster
+      val hints = toRelContext.getTableHints
+      val tableScan = LogicalTableScan.create(cluster, table, hints)
       tableScan match {
         case s: LogicalTableScan if FlinkLogicalTableSourceScan.isTableSourceScan(s) =>
-          FlinkLogicalTableSourceScan.create(cluster,
+          FlinkLogicalTableSourceScan.create(
+            cluster,
             s.getTable.asInstanceOf[FlinkPreparingTableBase])
         case s: LogicalTableScan if FlinkLogicalDataStreamTableScan.isDataStreamTableScan(s) =>
-          FlinkLogicalDataStreamTableScan.create(cluster,
+          FlinkLogicalDataStreamTableScan.create(
+            cluster,
             s.getTable.asInstanceOf[FlinkPreparingTableBase])
       }
     }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 6426436..e28e1e7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -22,10 +22,12 @@ import org.apache.flink.sql.parser.ExtendedSqlNode
 import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader
 
+import com.google.common.collect.ImmutableList
 import org.apache.calcite.config.NullCollation
 import org.apache.calcite.plan._
 import org.apache.calcite.prepare.CalciteCatalogReader
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rel.{RelFieldCollation, RelRoot}
 import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
 import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable}
@@ -190,6 +192,8 @@ class FlinkPlannerImpl(
     }
 
     override def getCluster: RelOptCluster = cluster
+
+    override def getTableHints: util.List[RelHint] = ImmutableList.of()
   }
 }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelOptCluster.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelOptCluster.scala
deleted file mode 100644
index d9d8b33..0000000
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelOptCluster.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.calcite
-
-import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
-
-import org.apache.calcite.plan.{RelOptCluster, RelOptPlanner, RelOptRule, RelOptRuleCall}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataTypeFactory
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rex.RexBuilder
-
-import java.util
-import java.util.concurrent.atomic.AtomicInteger
-
-/**
-  * Flink specific [[RelOptCluster]] to use [[FlinkRelMetadataQuery]]
-  * instead of [[RelMetadataQuery]].
-  */
-class FlinkRelOptCluster(
-    planner: RelOptPlanner,
-    typeFactory: RelDataTypeFactory,
-    rexBuilder: RexBuilder,
-    nextCorrel: AtomicInteger,
-    mapCorrelToRel: util.Map[String, RelNode])
-  extends RelOptCluster(planner, typeFactory, rexBuilder, nextCorrel, mapCorrelToRel) {
-
-  private var fmq: FlinkRelMetadataQuery = _
-
-  /**
-    * Returns the current [[FlinkRelMetadataQuery]] instead of [[RelMetadataQuery]].
-    *
-    * <p>This method might be changed or moved in future.
-    * If you have a [[RelOptRuleCall]] available,
-    * for example if you are in a [[RelOptRule#onMatch(RelOptRuleCall)]]
-    * method, then use [[RelOptRuleCall#getMetadataQuery()]] instead.
-    */
-  override def getMetadataQuery: RelMetadataQuery = {
-    if (fmq == null) {
-      fmq = FlinkRelMetadataQuery.instance()
-    }
-    fmq
-  }
-
-  /**
-    * Should be called whenever the current [[FlinkRelMetadataQuery]] becomes
-    * invalid. Typically invoked from [[RelOptRuleCall#transformTo]].
-    */
-  override def invalidateMetadataQuery(): Unit = fmq = null
-}
-
-object FlinkRelOptCluster {
-  /** Creates a FlinkRelOptCluster instance. */
-  def create(planner: RelOptPlanner, rexBuilder: RexBuilder): RelOptCluster =
-    new FlinkRelOptCluster(
-      planner,
-      rexBuilder.getTypeFactory,
-      rexBuilder,
-      new AtomicInteger(0),
-      new util.HashMap[String, RelNode])
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala
index fa6eb10..b6bec80 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala
@@ -35,6 +35,8 @@ import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.sql.fun.SqlStdOperatorTable.FINAL
 
+import java.util.{Collections => JCollections}
+
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -464,6 +466,10 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
       updatedAggCalls)
   }
 
+  override def visit(modify: LogicalTableModify): RelNode = {
+    val input = modify.getInput.accept(this)
+    modify.copy(modify.getTraitSet, JCollections.singletonList(input))
+  }
 }
 
 object RelTimeIndicatorConverter {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala
index 532e9df..5da5d9b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala
@@ -21,12 +21,16 @@ package org.apache.flink.table.planner.plan.nodes.common
 import org.apache.flink.table.planner.plan.nodes.ExpressionFormat.ExpressionFormat
 import org.apache.flink.table.planner.plan.nodes.{ExpressionFormat, FlinkRelNode}
 import org.apache.flink.table.planner.plan.utils.RelExplainUtil.{conditionToString, preferExpressionFormat}
+
 import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
 import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex.{RexCall, RexInputRef, RexLiteral, RexProgram}
 
+import java.util.Collections
+
 import scala.collection.JavaConversions._
 
 /**
@@ -37,7 +41,7 @@ abstract class CommonCalc(
     traitSet: RelTraitSet,
     input: RelNode,
     calcProgram: RexProgram)
-  extends Calc(cluster, traitSet, input, calcProgram)
+  extends Calc(cluster, traitSet, Collections.emptyList[RelHint](), input, calcProgram)
   with FlinkRelNode {
 
   override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
index 476fe4a..2307c72 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
@@ -26,12 +26,14 @@ import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
+import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rel.logical.LogicalAggregate
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.util.ImmutableBitSet
 
 import java.util
+import java.util.Collections
 
 import scala.collection.JavaConversions._
 
@@ -48,7 +50,8 @@ class FlinkLogicalAggregate(
     aggCalls: util.List[AggregateCall],
     /* flag indicating whether to skip SplitAggregateRule */
     var partialFinalType: PartialFinalType = PartialFinalType.NONE)
-  extends Aggregate(cluster, traitSet, child, groupSet, groupSets, aggCalls)
+  extends Aggregate(cluster, traitSet, Collections.emptyList[RelHint](),
+    child, groupSet, groupSets, aggCalls)
   with FlinkLogicalRel {
 
   def setPartialFinalType(partialFinalType: PartialFinalType): Unit = {
@@ -78,7 +81,6 @@ class FlinkLogicalAggregate(
       planner.getCostFactory.makeCost(rowCnt, cpuCost, rowCnt * rowSize)
     }
   }
-
 }
 
 private class FlinkLogicalAggregateBatchConverter
@@ -157,6 +159,6 @@ object FlinkLogicalAggregate {
       aggCalls: util.List[AggregateCall]): FlinkLogicalAggregate = {
     val cluster = input.getCluster
     val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).simplify()
-    new FlinkLogicalAggregate(cluster,traitSet, input, groupSet, groupSets, aggCalls)
+    new FlinkLogicalAggregate(cluster, traitSet, input, groupSet, groupSets, aggCalls)
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala
index b104a66..d630e85 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala
@@ -25,11 +25,13 @@ import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rel.logical.LogicalTableScan
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
 
 import java.util
+import java.util.Collections
 import java.util.function.Supplier
 
 /**
@@ -40,7 +42,7 @@ class FlinkLogicalDataStreamTableScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable)
-  extends TableScan(cluster, traitSet, table)
+  extends TableScan(cluster, traitSet, Collections.emptyList[RelHint](), table)
   with FlinkLogicalRel {
 
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala
index cfe6f71..cbc8396 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala
@@ -24,10 +24,13 @@ import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.core.{CorrelationId, Join, JoinRelType}
+import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rel.logical.LogicalJoin
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rex.RexNode
 
+import java.util.Collections
+
 import scala.collection.JavaConversions._
 
 /**
@@ -41,7 +44,8 @@ class FlinkLogicalJoin(
     right: RelNode,
     condition: RexNode,
     joinType: JoinRelType)
-  extends Join(cluster, traitSet, left, right, condition, Set.empty[CorrelationId], joinType)
+  extends Join(cluster, traitSet, Collections.emptyList[RelHint](),
+    left, right, condition, Set.empty[CorrelationId], joinType)
   with FlinkLogicalRel {
 
   override def copy(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index f28328d..396318a7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -28,11 +28,13 @@ import org.apache.calcite.plan._
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rel.logical.LogicalTableScan
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode, RelWriter}
 
 import java.util
+import java.util.Collections
 import java.util.function.Supplier
 
 /**
@@ -43,7 +45,7 @@ class FlinkLogicalTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     relOptTable: TableSourceTable[_])
-  extends TableScan(cluster, traitSet, relOptTable)
+  extends TableScan(cluster, traitSet, Collections.emptyList[RelHint](), relOptTable)
   with FlinkLogicalRel {
 
   lazy val tableSource: TableSource[_] = tableSourceTable.tableSource
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala
index f2d6c51..37ff255 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala
@@ -21,11 +21,11 @@ package org.apache.flink.table.planner.plan.stats
 import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
 import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity
 
+import com.google.common.collect.ImmutableList
 import org.apache.calcite.rel.{RelCollation, RelDistribution, RelReferentialConstraint}
 import org.apache.calcite.schema.Statistic
 import org.apache.calcite.util.ImmutableBitSet
 
-import java.lang.Double
 import java.util
 
 import scala.collection.JavaConversions._
@@ -80,7 +80,7 @@ class FlinkStatistic(
     *
     * @return The number of rows of the table.
     */
-  override def getRowCount: Double = {
+  override def getRowCount: java.lang.Double = {
     if (tableStats != TableStats.UNKNOWN) {
       val rowCount = tableStats.getRowCount.toDouble
       // rowCount requires non-negative number
@@ -136,6 +136,8 @@ class FlinkStatistic(
     }
     builder.toString()
   }
+
+  override def getKeys: util.List[ImmutableBitSet] = ImmutableList.of()
 }
 
 /**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index 674fb2b..760306e8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -128,10 +128,6 @@ object AggregateUtil extends Enumeration {
       require(auxGroupCalls.isEmpty,
         "AUXILIARY_GROUP aggCalls should be empty when groupSet is empty")
     }
-    if (agg.indicator) {
-      require(auxGroupCalls.isEmpty,
-        "AUXILIARY_GROUP aggCalls should be empty when indicator is true")
-    }
 
     val auxGrouping = auxGroupCalls.map(_.getArgList.head.toInt).toArray
     require(auxGrouping.length + otherAggCalls.length == aggCalls.length)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelShuttles.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelShuttles.scala
index 4f74d24..077c8c1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelShuttles.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelShuttles.scala
@@ -72,6 +72,8 @@ class DefaultRelShuttle extends RelShuttle {
   override def visit(join: LogicalJoin): RelNode = visit(join.asInstanceOf[RelNode])
 
   override def visit(correlate: LogicalCorrelate): RelNode = visit(correlate.asInstanceOf[RelNode])
+
+  override def visit(modify: LogicalTableModify): RelNode = visit(modify.asInstanceOf[RelNode])
 }
 
 /**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
index 84397c9..d4a6d3b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.planner.sinks
 
-import org.apache.calcite.rel.RelNode
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
@@ -27,7 +26,6 @@ import org.apache.flink.table.catalog.{CatalogTable, ObjectIdentifier}
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.operations.CatalogSinkModifyOperation
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.plan.utils.RelOptUtils
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
 import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
 import org.apache.flink.table.sinks._
@@ -41,6 +39,9 @@ import org.apache.flink.table.types.utils.TypeConversions.{fromLegacyInfoToDataT
 import org.apache.flink.table.utils.{TableSchemaUtils, TypeMappingUtils}
 import org.apache.flink.types.Row
 
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
+
 import _root_.scala.collection.JavaConversions._
 
 object TableSinkUtils {
@@ -78,7 +79,7 @@ object TableSinkUtils {
         val castedDataType = typeFactory.buildRelNodeRowType(
           sinkLogicalType.getFieldNames,
           sinkLogicalType.getFields.map(_.getType))
-        RelOptUtils.createCastRel(query, castedDataType)
+        RelOptUtil.createCastRel(query, castedDataType, true)
       }
     } else {
       // format query and sink schema strings
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala
index efed57a..074a072 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala
@@ -59,7 +59,6 @@ class ViewsExpandingTest(tableTestUtil: TableTestBase => TableTestUtil) extends
     val tableUtil = tableTestUtil(this)
     val tableEnv = tableUtil.tableEnv
     tableUtil.addDataStream[(Int, String, Int)]("t1", 'a, 'b, 'c)
-    val catalog = tableEnv.getCatalog(tableEnv.getCurrentCatalog).get()
     tableEnv.createTemporaryView("view1", tableEnv.from("t1"))
     tableEnv.createTemporaryView("view2", tableEnv.from("view1"))
     tableEnv.createTemporaryView("view3", tableEnv.from("view2"))
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
index 0fa3485..e4ddf88 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
@@ -19,13 +19,16 @@
 package org.apache.flink.table.planner.plan.metadata
 
 import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
+import org.apache.flink.table.catalog.FunctionCatalog
+import org.apache.flink.table.module.ModuleManager
 import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
 import org.apache.flink.table.planner.calcite.{FlinkContextImpl, FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.planner.plan.schema._
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
 import org.apache.flink.table.planner.{JDouble, JLong}
+import org.apache.flink.table.utils.CatalogManagerMocks
 import org.apache.flink.util.Preconditions
+
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster}
 import org.apache.calcite.rel.`type`.RelDataType
@@ -45,10 +48,8 @@ import org.junit.{Before, BeforeClass, Test}
 import org.powermock.api.mockito.PowerMockito._
 import org.powermock.core.classloader.annotations.PrepareForTest
 import org.powermock.modules.junit4.PowerMockRunner
-import java.math.BigDecimal
 
-import org.apache.flink.table.module.ModuleManager
-import org.apache.flink.table.utils.CatalogManagerMocks
+import java.math.BigDecimal
 
 import scala.collection.JavaConversions._
 
@@ -634,4 +635,3 @@ object AggCallSelectivityEstimatorTest {
   }
 
 }
-
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala
index 911d732..031a668 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala
@@ -26,7 +26,9 @@ import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkContextImpl, F
 import org.apache.flink.table.planner.plan.schema._
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
 import org.apache.flink.table.planner.{JDouble, JLong}
+import org.apache.flink.table.utils.CatalogManagerMocks
 import org.apache.flink.util.Preconditions
+
 import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
@@ -43,11 +45,10 @@ import org.junit.{Before, BeforeClass, Test}
 import org.powermock.api.mockito.PowerMockito._
 import org.powermock.core.classloader.annotations.PrepareForTest
 import org.powermock.modules.junit4.PowerMockRunner
+
 import java.math.BigDecimal
 import java.sql.{Date, Time, Timestamp}
 
-import org.apache.flink.table.utils.CatalogManagerMocks
-
 import scala.collection.JavaConverters._
 
 /**
@@ -1111,6 +1112,4 @@ object SelectivityEstimatorTest {
       .THREAD_PROVIDERS
       .set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE))
   }
-
 }
-
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgramTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgramTest.scala
index 8b6039b..10cec79 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgramTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgramTest.scala
@@ -26,6 +26,8 @@ import org.apache.calcite.tools.RuleSets
 import org.junit.Assert._
 import org.junit.Test
 
+import java.util.Collections
+
 import scala.collection.JavaConversions._
 
 /**
@@ -49,10 +51,7 @@ class FlinkChainedProgramTest {
       .addRuleInstance(SubQueryRemoveRule.JOIN)
       .addMatchLimit(100)
       .addMatchOrder(HepMatchOrder.BOTTOM_UP)
-      .addRuleCollection(Array(
-        TableScanRule.INSTANCE,
-        ValuesReduceRule.FILTER_INSTANCE
-      ).toList)
+      .addRuleCollection(Collections.singletonList(ValuesReduceRule.FILTER_INSTANCE))
     val program1 = FlinkHepProgram(builder.build())
     assertTrue(programs.addFirst("o2", program1))
     assertEquals(List("o2"), programs.getProgramNames.toList)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index d3aaf7d..7f85245 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -34,6 +34,8 @@ import org.apache.flink.table.functions.sql.ProctimeSqlFunction
 import org.apache.flink.table.plan.logical.rel._
 import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
 
+import java.util.{Collections => JCollections}
+
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -377,6 +379,11 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
       rexBuilder,
       inputs.flatMap(_.getRowType.getFieldList.map(_.getType)))
   }
+
+  override def visit(modify: LogicalTableModify): RelNode = {
+    val input = modify.getInput.accept(this)
+    modify.copy(modify.getTraitSet, JCollections.singletonList(input))
+  }
 }
 
 object RelTimeIndicatorConverter {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
index 754509e..957bdf6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
@@ -70,6 +70,8 @@ class FlinkStatistic(tableStats: Option[TableStats]) extends Statistic {
 
   override def getReferentialConstraints: util.List[RelReferentialConstraint] =
     Collections.emptyList()
+
+  override def getKeys: util.List[ImmutableBitSet] = Collections.emptyList()
 }
 
 /**