You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/31 02:15:19 UTC
[flink] 05/13: [FLINK-14338][table-planner][table-planner-blink]
Tweak implementations due to API change
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7dad5c47c92db305dc39f58acb1f8cf88b6eaed1
Author: yuzhao.cyz <yu...@gmail.com>
AuthorDate: Tue Mar 17 20:36:40 2020 +0800
[FLINK-14338][table-planner][table-planner-blink] Tweak implementations due to API change
* Replace RelOptUtils.createCastRel with RelOptUtil.createCastRel
* Implement RelOptTable#getKeys and Statistic#getKeys
* Changes logical nodes constructor for hints
* Implement RelShuttle.visit(LogicalTableModify)
---
.../table/planner/catalog/SqlCatalogViewTable.java | 4 +-
.../plan/schema/FlinkPreparingTableBase.java | 5 ++
.../table/planner/plan/utils/RelOptUtils.java | 85 ----------------------
.../planner/calcite/FlinkLogicalRelFactories.scala | 17 ++++-
.../table/planner/calcite/FlinkPlannerImpl.scala | 4 +
.../table/planner/calcite/FlinkRelOptCluster.scala | 77 --------------------
.../calcite/RelTimeIndicatorConverter.scala | 6 ++
.../planner/plan/nodes/common/CommonCalc.scala | 6 +-
.../plan/nodes/logical/FlinkLogicalAggregate.scala | 8 +-
.../logical/FlinkLogicalDataStreamTableScan.scala | 4 +-
.../plan/nodes/logical/FlinkLogicalJoin.scala | 6 +-
.../logical/FlinkLogicalTableSourceScan.scala | 4 +-
.../table/planner/plan/stats/FlinkStatistic.scala | 6 +-
.../table/planner/plan/utils/AggregateUtil.scala | 4 -
.../table/planner/plan/utils/RelShuttles.scala | 2 +
.../flink/table/planner/sinks/TableSinkUtils.scala | 7 +-
.../planner/plan/common/ViewsExpandingTest.scala | 1 -
.../metadata/AggCallSelectivityEstimatorTest.scala | 10 +--
.../plan/metadata/SelectivityEstimatorTest.scala | 7 +-
.../optimize/program/FlinkChainedProgramTest.scala | 7 +-
.../table/calcite/RelTimeIndicatorConverter.scala | 7 ++
.../flink/table/plan/stats/FlinkStatistic.scala | 2 +
22 files changed, 81 insertions(+), 198 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java
index a9f56e1..cf2fb35 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/SqlCatalogViewTable.java
@@ -21,9 +21,9 @@ package org.apache.flink.table.planner.catalog;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable;
import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
-import org.apache.flink.table.planner.plan.utils.RelOptUtils;
import org.apache.calcite.plan.RelOptSchema;
+import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
@@ -57,6 +57,6 @@ public class SqlCatalogViewTable extends ExpandingPreparingTable {
RelNode original = context
.expandView(rowType, view.getExpandedQuery(), viewPath, names)
.project();
- return RelOptUtils.createCastRel(original, rowType);
+ return RelOptUtil.createCastRel(original, rowType, true);
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/FlinkPreparingTableBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/FlinkPreparingTableBase.java
index ef4ff37..98d1711 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/FlinkPreparingTableBase.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/FlinkPreparingTableBase.java
@@ -268,6 +268,11 @@ public abstract class FlinkPreparingTableBase extends Prepare.AbstractPreparingT
}
}
+ @Override
+ public List<ImmutableBitSet> getKeys() {
+ return statistic.getKeys();
+ }
+
/**
* Returns unique keySets of current table.
*/
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelOptUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelOptUtils.java
deleted file mode 100644
index d3dfb3e..0000000
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelOptUtils.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.utils;
-
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexUtil;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * <code>RelOptUtils</code> defines static utility methods for use in optimizing
- * {@link RelNode}s.
- *
- * <p>This is an extension of {@link org.apache.calcite.plan.RelOptUtil}.
- */
-public class RelOptUtils {
- /**
- * Creates a projection which casts a rel's output to a desired row type.
- *
- * <p>This method is inspired by {@link RelOptUtil#createCastRel}, different with that,
- * we do not generate another {@link Project} if the {@code rel} is already a {@link Project}.
- *
- * @param rel Producer of rows to be converted
- * @param castRowType Row type after cast
- * @return Conversion rel with castRowType
- */
- public static RelNode createCastRel(RelNode rel, RelDataType castRowType) {
- RelFactories.ProjectFactory projectFactory = RelFactories.DEFAULT_PROJECT_FACTORY;
- final RelDataType oriRowType = rel.getRowType();
- if (RelOptUtil.areRowTypesEqual(oriRowType, castRowType, true)) {
- // nothing to do
- return rel;
- }
- final RexBuilder rexBuilder = rel.getCluster().getRexBuilder();
-
- final List<RelDataTypeField> fieldList = oriRowType.getFieldList();
- int n = fieldList.size();
- assert n == castRowType.getFieldCount()
- : "field count: lhs [" + castRowType + "] rhs [" + oriRowType + "]";
-
- final List<RexNode> rhsExps;
- final RelNode input;
- if (rel instanceof Project) {
- rhsExps = ((Project) rel).getProjects();
- // Avoid to generate redundant project node.
- input = rel.getInput(0);
- } else {
- rhsExps = new ArrayList<>();
- for (RelDataTypeField field : fieldList) {
- rhsExps.add(rexBuilder.makeInputRef(field.getType(), field.getIndex()));
- }
- input = rel;
- }
-
- final List<RexNode> castExps =
- RexUtil.generateCastExpressions(rexBuilder, castRowType, rhsExps);
- // Use names and types from castRowType.
- return projectFactory.createProject(input, castExps,
- castRowType.getFieldNames());
- }
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala
index 99260d3..56ddbf0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala
@@ -25,10 +25,12 @@ import org.apache.flink.table.runtime.operators.rank.{RankRange, RankType}
import org.apache.flink.table.sinks.TableSink
import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptTable.ToRelContext
import org.apache.calcite.plan.{Contexts, RelOptCluster, RelOptTable}
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
import org.apache.calcite.rel.core.RelFactories._
import org.apache.calcite.rel.core._
+import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rel.logical._
import org.apache.calcite.rel.{RelCollation, RelNode}
import org.apache.calcite.rex._
@@ -82,6 +84,7 @@ object FlinkLogicalRelFactories {
class ProjectFactoryImpl extends ProjectFactory {
def createProject(
input: RelNode,
+ hints: util.List[RelHint],
childExprs: util.List[_ <: RexNode],
fieldNames: util.List[String]): RelNode = {
val rexBuilder = input.getCluster.getRexBuilder
@@ -134,6 +137,7 @@ object FlinkLogicalRelFactories {
class AggregateFactoryImpl extends AggregateFactory {
def createAggregate(
input: RelNode,
+ hints: util.List[RelHint],
groupSet: ImmutableBitSet,
groupSets: ImmutableList[ImmutableBitSet],
aggCalls: util.List[AggregateCall]): RelNode = {
@@ -168,6 +172,7 @@ object FlinkLogicalRelFactories {
def createJoin(
left: RelNode,
right: RelNode,
+ hints: util.List[RelHint],
condition: RexNode,
variablesSet: util.Set[CorrelationId],
joinType: JoinRelType,
@@ -208,14 +213,18 @@ object FlinkLogicalRelFactories {
* [[FlinkLogicalTableSourceScan]] or [[FlinkLogicalDataStreamTableScan]].
*/
class TableScanFactoryImpl extends TableScanFactory {
- def createScan(cluster: RelOptCluster, table: RelOptTable): RelNode = {
- val tableScan = LogicalTableScan.create(cluster, table)
+ def createScan(toRelContext: ToRelContext, table: RelOptTable): RelNode = {
+ val cluster = toRelContext.getCluster
+ val hints = toRelContext.getTableHints
+ val tableScan = LogicalTableScan.create(cluster, table, hints)
tableScan match {
case s: LogicalTableScan if FlinkLogicalTableSourceScan.isTableSourceScan(s) =>
- FlinkLogicalTableSourceScan.create(cluster,
+ FlinkLogicalTableSourceScan.create(
+ cluster,
s.getTable.asInstanceOf[FlinkPreparingTableBase])
case s: LogicalTableScan if FlinkLogicalDataStreamTableScan.isDataStreamTableScan(s) =>
- FlinkLogicalDataStreamTableScan.create(cluster,
+ FlinkLogicalDataStreamTableScan.create(
+ cluster,
s.getTable.asInstanceOf[FlinkPreparingTableBase])
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 6426436..e28e1e7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -22,10 +22,12 @@ import org.apache.flink.sql.parser.ExtendedSqlNode
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader
+import com.google.common.collect.ImmutableList
import org.apache.calcite.config.NullCollation
import org.apache.calcite.plan._
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rel.{RelFieldCollation, RelRoot}
import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable}
@@ -190,6 +192,8 @@ class FlinkPlannerImpl(
}
override def getCluster: RelOptCluster = cluster
+
+ override def getTableHints: util.List[RelHint] = ImmutableList.of()
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelOptCluster.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelOptCluster.scala
deleted file mode 100644
index d9d8b33..0000000
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelOptCluster.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.calcite
-
-import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
-
-import org.apache.calcite.plan.{RelOptCluster, RelOptPlanner, RelOptRule, RelOptRuleCall}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataTypeFactory
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rex.RexBuilder
-
-import java.util
-import java.util.concurrent.atomic.AtomicInteger
-
-/**
- * Flink specific [[RelOptCluster]] to use [[FlinkRelMetadataQuery]]
- * instead of [[RelMetadataQuery]].
- */
-class FlinkRelOptCluster(
- planner: RelOptPlanner,
- typeFactory: RelDataTypeFactory,
- rexBuilder: RexBuilder,
- nextCorrel: AtomicInteger,
- mapCorrelToRel: util.Map[String, RelNode])
- extends RelOptCluster(planner, typeFactory, rexBuilder, nextCorrel, mapCorrelToRel) {
-
- private var fmq: FlinkRelMetadataQuery = _
-
- /**
- * Returns the current [[FlinkRelMetadataQuery]] instead of [[RelMetadataQuery]].
- *
- * <p>This method might be changed or moved in future.
- * If you have a [[RelOptRuleCall]] available,
- * for example if you are in a [[RelOptRule#onMatch(RelOptRuleCall)]]
- * method, then use [[RelOptRuleCall#getMetadataQuery()]] instead.
- */
- override def getMetadataQuery: RelMetadataQuery = {
- if (fmq == null) {
- fmq = FlinkRelMetadataQuery.instance()
- }
- fmq
- }
-
- /**
- * Should be called whenever the current [[FlinkRelMetadataQuery]] becomes
- * invalid. Typically invoked from [[RelOptRuleCall#transformTo]].
- */
- override def invalidateMetadataQuery(): Unit = fmq = null
-}
-
-object FlinkRelOptCluster {
- /** Creates a FlinkRelOptCluster instance. */
- def create(planner: RelOptPlanner, rexBuilder: RexBuilder): RelOptCluster =
- new FlinkRelOptCluster(
- planner,
- rexBuilder.getTypeFactory,
- rexBuilder,
- new AtomicInteger(0),
- new util.HashMap[String, RelNode])
-}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala
index fa6eb10..b6bec80 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala
@@ -35,6 +35,8 @@ import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.fun.SqlStdOperatorTable.FINAL
+import java.util.{Collections => JCollections}
+
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -464,6 +466,10 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
updatedAggCalls)
}
+ override def visit(modify: LogicalTableModify): RelNode = {
+ val input = modify.getInput.accept(this)
+ modify.copy(modify.getTraitSet, JCollections.singletonList(input))
+ }
}
object RelTimeIndicatorConverter {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala
index 532e9df..5da5d9b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala
@@ -21,12 +21,16 @@ package org.apache.flink.table.planner.plan.nodes.common
import org.apache.flink.table.planner.plan.nodes.ExpressionFormat.ExpressionFormat
import org.apache.flink.table.planner.plan.nodes.{ExpressionFormat, FlinkRelNode}
import org.apache.flink.table.planner.plan.utils.RelExplainUtil.{conditionToString, preferExpressionFormat}
+
import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rex.{RexCall, RexInputRef, RexLiteral, RexProgram}
+import java.util.Collections
+
import scala.collection.JavaConversions._
/**
@@ -37,7 +41,7 @@ abstract class CommonCalc(
traitSet: RelTraitSet,
input: RelNode,
calcProgram: RexProgram)
- extends Calc(cluster, traitSet, input, calcProgram)
+ extends Calc(cluster, traitSet, Collections.emptyList[RelHint](), input, calcProgram)
with FlinkRelNode {
override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
index 476fe4a..2307c72 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
@@ -26,12 +26,14 @@ import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
+import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rel.logical.LogicalAggregate
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.util.ImmutableBitSet
import java.util
+import java.util.Collections
import scala.collection.JavaConversions._
@@ -48,7 +50,8 @@ class FlinkLogicalAggregate(
aggCalls: util.List[AggregateCall],
/* flag indicating whether to skip SplitAggregateRule */
var partialFinalType: PartialFinalType = PartialFinalType.NONE)
- extends Aggregate(cluster, traitSet, child, groupSet, groupSets, aggCalls)
+ extends Aggregate(cluster, traitSet, Collections.emptyList[RelHint](),
+ child, groupSet, groupSets, aggCalls)
with FlinkLogicalRel {
def setPartialFinalType(partialFinalType: PartialFinalType): Unit = {
@@ -78,7 +81,6 @@ class FlinkLogicalAggregate(
planner.getCostFactory.makeCost(rowCnt, cpuCost, rowCnt * rowSize)
}
}
-
}
private class FlinkLogicalAggregateBatchConverter
@@ -157,6 +159,6 @@ object FlinkLogicalAggregate {
aggCalls: util.List[AggregateCall]): FlinkLogicalAggregate = {
val cluster = input.getCluster
val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).simplify()
- new FlinkLogicalAggregate(cluster,traitSet, input, groupSet, groupSets, aggCalls)
+ new FlinkLogicalAggregate(cluster, traitSet, input, groupSet, groupSets, aggCalls)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala
index b104a66..d630e85 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala
@@ -25,11 +25,13 @@ import com.google.common.collect.ImmutableList
import org.apache.calcite.plan._
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rel.logical.LogicalTableScan
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
import java.util
+import java.util.Collections
import java.util.function.Supplier
/**
@@ -40,7 +42,7 @@ class FlinkLogicalDataStreamTableScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable)
- extends TableScan(cluster, traitSet, table)
+ extends TableScan(cluster, traitSet, Collections.emptyList[RelHint](), table)
with FlinkLogicalRel {
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala
index cfe6f71..cbc8396 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalJoin.scala
@@ -24,10 +24,13 @@ import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.{CorrelationId, Join, JoinRelType}
+import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rel.logical.LogicalJoin
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rex.RexNode
+import java.util.Collections
+
import scala.collection.JavaConversions._
/**
@@ -41,7 +44,8 @@ class FlinkLogicalJoin(
right: RelNode,
condition: RexNode,
joinType: JoinRelType)
- extends Join(cluster, traitSet, left, right, condition, Set.empty[CorrelationId], joinType)
+ extends Join(cluster, traitSet, Collections.emptyList[RelHint](),
+ left, right, condition, Set.empty[CorrelationId], joinType)
with FlinkLogicalRel {
override def copy(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index f28328d..396318a7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -28,11 +28,13 @@ import org.apache.calcite.plan._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rel.logical.LogicalTableScan
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode, RelWriter}
import java.util
+import java.util.Collections
import java.util.function.Supplier
/**
@@ -43,7 +45,7 @@ class FlinkLogicalTableSourceScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
relOptTable: TableSourceTable[_])
- extends TableScan(cluster, traitSet, relOptTable)
+ extends TableScan(cluster, traitSet, Collections.emptyList[RelHint](), relOptTable)
with FlinkLogicalRel {
lazy val tableSource: TableSource[_] = tableSourceTable.tableSource
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala
index f2d6c51..37ff255 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/stats/FlinkStatistic.scala
@@ -21,11 +21,11 @@ package org.apache.flink.table.planner.plan.stats
import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity
+import com.google.common.collect.ImmutableList
import org.apache.calcite.rel.{RelCollation, RelDistribution, RelReferentialConstraint}
import org.apache.calcite.schema.Statistic
import org.apache.calcite.util.ImmutableBitSet
-import java.lang.Double
import java.util
import scala.collection.JavaConversions._
@@ -80,7 +80,7 @@ class FlinkStatistic(
*
* @return The number of rows of the table.
*/
- override def getRowCount: Double = {
+ override def getRowCount: java.lang.Double = {
if (tableStats != TableStats.UNKNOWN) {
val rowCount = tableStats.getRowCount.toDouble
// rowCount requires non-negative number
@@ -136,6 +136,8 @@ class FlinkStatistic(
}
builder.toString()
}
+
+ override def getKeys: util.List[ImmutableBitSet] = ImmutableList.of()
}
/**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index 674fb2b..760306e8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -128,10 +128,6 @@ object AggregateUtil extends Enumeration {
require(auxGroupCalls.isEmpty,
"AUXILIARY_GROUP aggCalls should be empty when groupSet is empty")
}
- if (agg.indicator) {
- require(auxGroupCalls.isEmpty,
- "AUXILIARY_GROUP aggCalls should be empty when indicator is true")
- }
val auxGrouping = auxGroupCalls.map(_.getArgList.head.toInt).toArray
require(auxGrouping.length + otherAggCalls.length == aggCalls.length)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelShuttles.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelShuttles.scala
index 4f74d24..077c8c1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelShuttles.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelShuttles.scala
@@ -72,6 +72,8 @@ class DefaultRelShuttle extends RelShuttle {
override def visit(join: LogicalJoin): RelNode = visit(join.asInstanceOf[RelNode])
override def visit(correlate: LogicalCorrelate): RelNode = visit(correlate.asInstanceOf[RelNode])
+
+ override def visit(modify: LogicalTableModify): RelNode = visit(modify.asInstanceOf[RelNode])
}
/**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
index 84397c9..d4a6d3b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
@@ -18,7 +18,6 @@
package org.apache.flink.table.planner.sinks
-import org.apache.calcite.rel.RelNode
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, TupleTypeInfo}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
@@ -27,7 +26,6 @@ import org.apache.flink.table.catalog.{CatalogTable, ObjectIdentifier}
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.operations.CatalogSinkModifyOperation
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.plan.utils.RelOptUtils
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
import org.apache.flink.table.sinks._
@@ -41,6 +39,9 @@ import org.apache.flink.table.types.utils.TypeConversions.{fromLegacyInfoToDataT
import org.apache.flink.table.utils.{TableSchemaUtils, TypeMappingUtils}
import org.apache.flink.types.Row
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
+
import _root_.scala.collection.JavaConversions._
object TableSinkUtils {
@@ -78,7 +79,7 @@ object TableSinkUtils {
val castedDataType = typeFactory.buildRelNodeRowType(
sinkLogicalType.getFieldNames,
sinkLogicalType.getFields.map(_.getType))
- RelOptUtils.createCastRel(query, castedDataType)
+ RelOptUtil.createCastRel(query, castedDataType, true)
}
} else {
// format query and sink schema strings
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala
index efed57a..074a072 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala
@@ -59,7 +59,6 @@ class ViewsExpandingTest(tableTestUtil: TableTestBase => TableTestUtil) extends
val tableUtil = tableTestUtil(this)
val tableEnv = tableUtil.tableEnv
tableUtil.addDataStream[(Int, String, Int)]("t1", 'a, 'b, 'c)
- val catalog = tableEnv.getCatalog(tableEnv.getCurrentCatalog).get()
tableEnv.createTemporaryView("view1", tableEnv.from("t1"))
tableEnv.createTemporaryView("view2", tableEnv.from("view1"))
tableEnv.createTemporaryView("view3", tableEnv.from("view2"))
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
index 0fa3485..e4ddf88 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
@@ -19,13 +19,16 @@
package org.apache.flink.table.planner.plan.metadata
import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
+import org.apache.flink.table.catalog.FunctionCatalog
+import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
import org.apache.flink.table.planner.calcite.{FlinkContextImpl, FlinkTypeFactory, FlinkTypeSystem}
import org.apache.flink.table.planner.plan.schema._
import org.apache.flink.table.planner.plan.stats.FlinkStatistic
import org.apache.flink.table.planner.{JDouble, JLong}
+import org.apache.flink.table.utils.CatalogManagerMocks
import org.apache.flink.util.Preconditions
+
import com.google.common.collect.ImmutableList
import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster}
import org.apache.calcite.rel.`type`.RelDataType
@@ -45,10 +48,8 @@ import org.junit.{Before, BeforeClass, Test}
import org.powermock.api.mockito.PowerMockito._
import org.powermock.core.classloader.annotations.PrepareForTest
import org.powermock.modules.junit4.PowerMockRunner
-import java.math.BigDecimal
-import org.apache.flink.table.module.ModuleManager
-import org.apache.flink.table.utils.CatalogManagerMocks
+import java.math.BigDecimal
import scala.collection.JavaConversions._
@@ -634,4 +635,3 @@ object AggCallSelectivityEstimatorTest {
}
}
-
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala
index 911d732..031a668 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala
@@ -26,7 +26,9 @@ import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkContextImpl, F
import org.apache.flink.table.planner.plan.schema._
import org.apache.flink.table.planner.plan.stats.FlinkStatistic
import org.apache.flink.table.planner.{JDouble, JLong}
+import org.apache.flink.table.utils.CatalogManagerMocks
import org.apache.flink.util.Preconditions
+
import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.TableScan
@@ -43,11 +45,10 @@ import org.junit.{Before, BeforeClass, Test}
import org.powermock.api.mockito.PowerMockito._
import org.powermock.core.classloader.annotations.PrepareForTest
import org.powermock.modules.junit4.PowerMockRunner
+
import java.math.BigDecimal
import java.sql.{Date, Time, Timestamp}
-import org.apache.flink.table.utils.CatalogManagerMocks
-
import scala.collection.JavaConverters._
/**
@@ -1111,6 +1112,4 @@ object SelectivityEstimatorTest {
.THREAD_PROVIDERS
.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE))
}
-
}
-
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgramTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgramTest.scala
index 8b6039b..10cec79 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgramTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChainedProgramTest.scala
@@ -26,6 +26,8 @@ import org.apache.calcite.tools.RuleSets
import org.junit.Assert._
import org.junit.Test
+import java.util.Collections
+
import scala.collection.JavaConversions._
/**
@@ -49,10 +51,7 @@ class FlinkChainedProgramTest {
.addRuleInstance(SubQueryRemoveRule.JOIN)
.addMatchLimit(100)
.addMatchOrder(HepMatchOrder.BOTTOM_UP)
- .addRuleCollection(Array(
- TableScanRule.INSTANCE,
- ValuesReduceRule.FILTER_INSTANCE
- ).toList)
+ .addRuleCollection(Collections.singletonList(ValuesReduceRule.FILTER_INSTANCE))
val program1 = FlinkHepProgram(builder.build())
assertTrue(programs.addFirst("o2", program1))
assertEquals(List("o2"), programs.getProgramNames.toList)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index d3aaf7d..7f85245 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -34,6 +34,8 @@ import org.apache.flink.table.functions.sql.ProctimeSqlFunction
import org.apache.flink.table.plan.logical.rel._
import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
+import java.util.{Collections => JCollections}
+
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -377,6 +379,11 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
rexBuilder,
inputs.flatMap(_.getRowType.getFieldList.map(_.getType)))
}
+
+ override def visit(modify: LogicalTableModify): RelNode = {
+ val input = modify.getInput.accept(this)
+ modify.copy(modify.getTraitSet, JCollections.singletonList(input))
+ }
}
object RelTimeIndicatorConverter {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
index 754509e..957bdf6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
@@ -70,6 +70,8 @@ class FlinkStatistic(tableStats: Option[TableStats]) extends Statistic {
override def getReferentialConstraints: util.List[RelReferentialConstraint] =
Collections.emptyList()
+
+ override def getKeys: util.List[ImmutableBitSet] = Collections.emptyList()
}
/**