You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/05/09 09:40:18 UTC
[flink] branch master updated: [FLINK-12392][table-planner-blink]
Port FlinkRelMetadataQuery into Java to avoid compiling error with Scala
2.12
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ca8d9ca [FLINK-12392][table-planner-blink] Port FlinkRelMetadataQuery into Java to avoid compiling error with Scala 2.12
ca8d9ca is described below
commit ca8d9cab87fa082a0939ce51b8369b75691df3a4
Author: godfreyhe <go...@163.com>
AuthorDate: Thu May 9 11:43:36 2019 +0800
[FLINK-12392][table-planner-blink] Port FlinkRelMetadataQuery into Java to avoid compiling error with Scala 2.12
This closes #8376
---
.../table/plan/metadata/FlinkRelMetadataQuery.java | 250 +++++++++++++++++++++
.../plan/metadata/FlinkRelMetadataQuery.scala | 220 ------------------
.../flink/table/runtime/utils/StreamTestSink.scala | 8 +-
3 files changed, 254 insertions(+), 224 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.java
new file mode 100644
index 0000000..1ede114
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.metadata;
+
+import org.apache.flink.table.plan.stats.ValueInterval;
+import org.apache.flink.table.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.plan.trait.RelModifiedMonotonicity;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.ImmutableBitSet;
+
+/**
+ * A RelMetadataQuery that defines extended metadata handler in Flink,
+ * e.g ColumnInterval, ColumnNullCount.
+ */
+public class FlinkRelMetadataQuery extends RelMetadataQuery {
+
+ protected static final FlinkRelMetadataQuery PROTOTYPE = new FlinkRelMetadataQuery(false);
+
+ private FlinkMetadata.ColumnInterval.Handler columnIntervalHandler;
+ private FlinkMetadata.FilteredColumnInterval.Handler filteredColumnInterval;
+ private FlinkMetadata.ColumnNullCount.Handler columnNullCountHandler;
+ private FlinkMetadata.ColumnOriginNullCount.Handler columnOriginNullCountHandler;
+ private FlinkMetadata.UniqueGroups.Handler uniqueGroupsHandler;
+ private FlinkMetadata.FlinkDistribution.Handler distributionHandler;
+ private FlinkMetadata.ModifiedMonotonicity.Handler modifiedMonotonicityHandler;
+
+ /**
+ * Returns an instance of FlinkRelMetadataQuery. It ensures that cycles do not
+ * occur while computing metadata.
+ */
+ public static FlinkRelMetadataQuery instance() {
+ return new FlinkRelMetadataQuery();
+ }
+
+ /**
+ * Reuse input metadataQuery instance if it could cast to FlinkRelMetadataQuery class,
+ * or create one if not.
+ *
+ * @param mq metadataQuery which try to reuse
+ * @return a FlinkRelMetadataQuery instance
+ */
+ public static FlinkRelMetadataQuery reuseOrCreate(RelMetadataQuery mq) {
+ if (mq instanceof FlinkRelMetadataQuery) {
+ return (FlinkRelMetadataQuery) mq;
+ } else {
+ return instance();
+ }
+ }
+
+ private FlinkRelMetadataQuery(
+ JaninoRelMetadataProvider metadataProvider,
+ RelMetadataQuery prototype) {
+ super(metadataProvider, prototype);
+ }
+
+ private FlinkRelMetadataQuery() {
+ super(RelMetadataQuery.THREAD_PROVIDERS.get(), RelMetadataQuery.EMPTY);
+ this.columnIntervalHandler = PROTOTYPE.columnIntervalHandler;
+ this.filteredColumnInterval = PROTOTYPE.filteredColumnInterval;
+ this.columnNullCountHandler = PROTOTYPE.columnNullCountHandler;
+ this.columnOriginNullCountHandler = PROTOTYPE.columnOriginNullCountHandler;
+ this.uniqueGroupsHandler = PROTOTYPE.uniqueGroupsHandler;
+ this.distributionHandler = PROTOTYPE.distributionHandler;
+ this.modifiedMonotonicityHandler = PROTOTYPE.modifiedMonotonicityHandler;
+ }
+
+ /**
+ * Creates and initializes the instance that will serve as a prototype for
+ * all other instances.
+ */
+ private FlinkRelMetadataQuery(boolean dummy) {
+ super(RelMetadataQuery.THREAD_PROVIDERS.get(), RelMetadataQuery.EMPTY);
+ this.columnIntervalHandler =
+ RelMetadataQuery.initialHandler(FlinkMetadata.ColumnInterval.Handler.class);
+ this.filteredColumnInterval =
+ RelMetadataQuery.initialHandler(FlinkMetadata.FilteredColumnInterval.Handler.class);
+ this.columnNullCountHandler =
+ RelMetadataQuery.initialHandler(FlinkMetadata.ColumnNullCount.Handler.class);
+ this.columnOriginNullCountHandler =
+ RelMetadataQuery.initialHandler(FlinkMetadata.ColumnOriginNullCount.Handler.class);
+ this.uniqueGroupsHandler =
+ RelMetadataQuery.initialHandler(FlinkMetadata.UniqueGroups.Handler.class);
+ this.distributionHandler =
+ RelMetadataQuery.initialHandler(FlinkMetadata.FlinkDistribution.Handler.class);
+ this.modifiedMonotonicityHandler =
+ RelMetadataQuery.initialHandler(FlinkMetadata.ModifiedMonotonicity.Handler.class);
+ }
+
+ /**
+ * Returns the {@link FlinkMetadata.ColumnInterval} statistic.
+ *
+ * @param rel the relational expression
+ * @param index the index of the given column
+ * @return the interval of the given column of a specified relational expression.
+ * Returns null if interval cannot be estimated,
+ * Returns {@link org.apache.flink.table.plan.stats.EmptyValueInterval}
+ * if column values does not contains any value except for null.
+ */
+ public ValueInterval getColumnInterval(RelNode rel, int index) {
+ for (; ; ) {
+ try {
+ return columnIntervalHandler.getColumnInterval(rel, this, index);
+ } catch (JaninoRelMetadataProvider.NoHandler e) {
+ columnIntervalHandler = revise(e.relClass, FlinkMetadata.ColumnInterval.DEF);
+ }
+ }
+ }
+
+ /**
+ * Returns the {@link FlinkMetadata.ColumnInterval} of the given column
+ * under the given filter argument.
+ *
+ * @param rel the relational expression
+ * @param columnIndex the index of the given column
+ * @param filterArg the index of the filter argument
+ * @return the interval of the given column of a specified relational expression.
+ * Returns null if interval cannot be estimated,
+ * Returns {@link org.apache.flink.table.plan.stats.EmptyValueInterval}
+ * if column values does not contains any value except for null.
+ */
+ public ValueInterval getFilteredColumnInterval(RelNode rel, int columnIndex, int filterArg) {
+ for (; ; ) {
+ try {
+ return filteredColumnInterval.getFilteredColumnInterval(
+ rel, this, columnIndex, filterArg);
+ } catch (JaninoRelMetadataProvider.NoHandler e) {
+ filteredColumnInterval =
+ revise(e.relClass, FlinkMetadata.FilteredColumnInterval.DEF);
+ }
+ }
+ }
+
+ /**
+ * Returns the null count of the given column.
+ *
+ * @param rel the relational expression
+ * @param index the index of the given column
+ * @return the null count of the given column if can be estimated, else return null.
+ */
+ public Double getColumnNullCount(RelNode rel, int index) {
+ for (; ; ) {
+ try {
+ return columnNullCountHandler.getColumnNullCount(rel, this, index);
+ } catch (JaninoRelMetadataProvider.NoHandler e) {
+ columnNullCountHandler = revise(e.relClass, FlinkMetadata.ColumnNullCount.DEF);
+ }
+ }
+ }
+
+ /**
+ * Returns origin null count of the given column.
+ *
+ * @param rel the relational expression
+ * @param index the index of the given column
+ * @return the null count of the given column if can be estimated, else return null.
+ */
+ public Double getColumnOriginNullCount(RelNode rel, int index) {
+ for (; ; ) {
+ try {
+ return columnOriginNullCountHandler.getColumnOriginNullCount(rel, this, index);
+ } catch (JaninoRelMetadataProvider.NoHandler e) {
+ columnOriginNullCountHandler =
+ revise(e.relClass, FlinkMetadata.ColumnOriginNullCount.DEF);
+ }
+ }
+ }
+
+ /**
+ * Returns the (minimum) unique groups of the given columns.
+ *
+ * @param rel the relational expression
+ * @param columns the given columns in a specified relational expression.
+ * The given columns should not be null.
+ * @return the (minimum) unique columns which should be a sub-collection of the given columns,
+ * and should not be null or empty. If none unique columns can be found, return the
+ * given columns.
+ */
+ public ImmutableBitSet getUniqueGroups(RelNode rel, ImmutableBitSet columns) {
+ for (; ; ) {
+ try {
+ Preconditions.checkArgument(columns != null);
+ if (columns.isEmpty()) {
+ return columns;
+ }
+ ImmutableBitSet uniqueGroups =
+ uniqueGroupsHandler.getUniqueGroups(rel, this, columns);
+ Preconditions.checkArgument(uniqueGroups != null && !uniqueGroups.isEmpty());
+ Preconditions.checkArgument(columns.contains(uniqueGroups));
+ return uniqueGroups;
+ } catch (JaninoRelMetadataProvider.NoHandler e) {
+ uniqueGroupsHandler = revise(e.relClass, FlinkMetadata.UniqueGroups.DEF);
+ }
+ }
+ }
+
+ /**
+ * Returns the {@link FlinkRelDistribution} statistic.
+ *
+ * @param rel the relational expression
+ * @return description of how the rows in the relational expression are
+ * physically distributed
+ */
+ public FlinkRelDistribution flinkDistribution(RelNode rel) {
+ for (; ; ) {
+ try {
+ return distributionHandler.flinkDistribution(rel, this);
+ } catch (JaninoRelMetadataProvider.NoHandler e) {
+ distributionHandler = revise(e.relClass, FlinkMetadata.FlinkDistribution.DEF);
+ }
+ }
+ }
+
+ /**
+ * Returns the {@link RelModifiedMonotonicity} statistic.
+ *
+ * @param rel the relational expression
+ * @return the monotonicity for the corresponding RelNode
+ */
+ public RelModifiedMonotonicity getRelModifiedMonotonicity(RelNode rel) {
+ for (; ; ) {
+ try {
+ return modifiedMonotonicityHandler.getRelModifiedMonotonicity(rel, this);
+ } catch (JaninoRelMetadataProvider.NoHandler e) {
+ modifiedMonotonicityHandler =
+ revise(e.relClass, FlinkMetadata.ModifiedMonotonicity.DEF);
+ }
+ }
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.scala
deleted file mode 100644
index 260792f..0000000
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.scala
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.metadata
-
-import org.apache.flink.table.JDouble
-import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, RelModifiedMonotonicity}
-import org.apache.flink.table.plan.metadata.FlinkMetadata._
-import org.apache.flink.table.plan.stats.ValueInterval
-
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQuery}
-import org.apache.calcite.util.ImmutableBitSet
-
-/**
- * RelMetadataQuery provides a strongly-typed facade on top of
- * [[org.apache.calcite.rel.metadata.RelMetadataProvider]]
- * for the set of relational expression metadata queries defined as standard within Calcite.
- * FlinkRelMetadataQuery class is to add flink specified metadata queries.
- *
- * @param metadataProvider provider which provides metadata
- * @param prototype the prototype which provides metadata handlers
- */
-class FlinkRelMetadataQuery private(
- metadataProvider: JaninoRelMetadataProvider,
- prototype: RelMetadataQuery) extends RelMetadataQuery(metadataProvider, prototype) {
-
- private[this] var columnIntervalHandler: ColumnInterval.Handler = _
- private[this] var filteredColumnInterval: FilteredColumnInterval.Handler = _
- private[this] var columnNullCountHandler: ColumnNullCount.Handler = _
- private[this] var columnOriginNullCountHandler: ColumnOriginNullCount.Handler = _
- private[this] var uniqueGroupsHandler: UniqueGroups.Handler = _
- private[this] var distributionHandler: FlinkDistribution.Handler = _
- private[this] var modifiedMonotonicityHandler: ModifiedMonotonicity.Handler = _
-
- private def this() {
- this(RelMetadataQuery.THREAD_PROVIDERS.get, RelMetadataQuery.EMPTY)
- this.columnIntervalHandler = RelMetadataQuery.initialHandler(classOf[ColumnInterval.Handler])
- this.filteredColumnInterval =
- RelMetadataQuery.initialHandler(classOf[FilteredColumnInterval.Handler])
- this.columnNullCountHandler = RelMetadataQuery.initialHandler(classOf[ColumnNullCount.Handler])
- this.columnOriginNullCountHandler =
- RelMetadataQuery.initialHandler(classOf[ColumnOriginNullCount.Handler])
- this.uniqueGroupsHandler = RelMetadataQuery.initialHandler(classOf[UniqueGroups.Handler])
- this.distributionHandler = RelMetadataQuery.initialHandler(classOf[FlinkDistribution.Handler])
- this.modifiedMonotonicityHandler =
- RelMetadataQuery.initialHandler(classOf[ModifiedMonotonicity.Handler])
- }
-
- /**
- * Returns the [[ColumnInterval]] statistic.
- *
- * @param rel the relational expression
- * @param index the index of the given column
- * @return the interval of the given column of a specified relational expression.
- * Returns null if interval cannot be estimated,
- * Returns [[org.apache.flink.table.plan.stats.EmptyValueInterval]]
- * if column values does not contains any value except for null.
- */
- def getColumnInterval(rel: RelNode, index: Int): ValueInterval = {
- try {
- columnIntervalHandler.getColumnInterval(rel, this, index)
- } catch {
- case e: JaninoRelMetadataProvider.NoHandler =>
- columnIntervalHandler = revise(e.relClass, FlinkMetadata.ColumnInterval.DEF)
- getColumnInterval(rel, index)
- }
- }
-
- /**
- * Returns the [[ColumnInterval]] of the given column under the given filter argument.
- *
- * @param rel the relational expression
- * @param columnIndex the index of the given column
- * @param filterArg the index of the filter argument
- * @return the interval of the given column of a specified relational expression.
- * Returns null if interval cannot be estimated,
- * Returns [[org.apache.flink.table.plan.stats.EmptyValueInterval]]
- * if column values does not contains any value except for null.
- */
- def getFilteredColumnInterval(rel: RelNode, columnIndex: Int, filterArg: Int): ValueInterval = {
- try {
- filteredColumnInterval.getFilteredColumnInterval(
- rel, this, columnIndex, filterArg)
- } catch {
- case e: JaninoRelMetadataProvider.NoHandler =>
- filteredColumnInterval = revise(e.relClass, FlinkMetadata.FilteredColumnInterval.DEF)
- getFilteredColumnInterval(rel, columnIndex, filterArg)
- }
- }
-
- /**
- * Returns the null count of the given column.
- *
- * @param rel the relational expression
- * @param index the index of the given column
- * @return the null count of the given column if can be estimated, else return null.
- */
- def getColumnNullCount(rel: RelNode, index: Int): JDouble = {
- try {
- columnNullCountHandler.getColumnNullCount(rel, this, index)
- } catch {
- case e: JaninoRelMetadataProvider.NoHandler =>
- columnNullCountHandler = revise(e.relClass, FlinkMetadata.ColumnNullCount.DEF)
- getColumnNullCount(rel, index)
- }
- }
-
- /**
- * Returns origin null count of the given column.
- *
- * @param rel the relational expression
- * @param index the index of the given column
- * @return the null count of the given column if can be estimated, else return null.
- */
- def getColumnOriginNullCount(rel: RelNode, index: Int): JDouble = {
- try {
- columnOriginNullCountHandler.getColumnOriginNullCount(rel, this, index)
- } catch {
- case e: JaninoRelMetadataProvider.NoHandler =>
- columnOriginNullCountHandler = revise(e.relClass, FlinkMetadata.ColumnOriginNullCount.DEF)
- getColumnOriginNullCount(rel, index)
- }
- }
-
- /**
- * Returns the (minimum) unique groups of the given columns.
- *
- * @param rel the relational expression
- * @param columns the given columns in a specified relational expression.
- * The given columns should not be null.
- * @return the (minimum) unique columns which should be a sub-collection of the given columns,
- * and should not be null or empty. If none unique columns can be found, return the
- * given columns.
- */
- def getUniqueGroups(rel: RelNode, columns: ImmutableBitSet): ImmutableBitSet = {
- try {
- require(columns != null)
- if (columns.isEmpty) {
- return columns
- }
- val uniqueGroups = uniqueGroupsHandler.getUniqueGroups(rel, this, columns)
- require(uniqueGroups != null && !uniqueGroups.isEmpty)
- require(columns.contains(uniqueGroups))
- uniqueGroups
- } catch {
- case e: JaninoRelMetadataProvider.NoHandler =>
- uniqueGroupsHandler = revise(e.relClass, FlinkMetadata.UniqueGroups.DEF)
- getUniqueGroups(rel, columns)
- }
- }
-
- /**
- * Returns the [[FlinkRelDistribution]] statistic.
- *
- * @param rel the relational expression
- * @return description of how the rows in the relational expression are
- * physically distributed
- */
- def flinkDistribution(rel: RelNode): FlinkRelDistribution = {
- try {
- distributionHandler.flinkDistribution(rel, this)
- } catch {
- case e: JaninoRelMetadataProvider.NoHandler =>
- distributionHandler = revise(e.relClass, FlinkMetadata.FlinkDistribution.DEF)
- flinkDistribution(rel)
- }
- }
-
- /**
- * Returns the [[RelModifiedMonotonicity]] statistic.
- *
- * @param rel the relational expression
- * @return the monotonicity for the corresponding RelNode
- */
- def getRelModifiedMonotonicity(rel: RelNode): RelModifiedMonotonicity = {
- try {
- modifiedMonotonicityHandler.getRelModifiedMonotonicity(rel, this)
- } catch {
- case e: JaninoRelMetadataProvider.NoHandler =>
- modifiedMonotonicityHandler = revise(e.relClass, FlinkMetadata.ModifiedMonotonicity.DEF)
- getRelModifiedMonotonicity(rel)
- }
- }
-
-}
-
-object FlinkRelMetadataQuery {
-
- def instance(): FlinkRelMetadataQuery = new FlinkRelMetadataQuery()
-
- /**
- * Reuse input metadataQuery instance if it could cast to FlinkRelMetadataQuery class,
- * or create one if not.
- *
- * @param mq metadataQuery which try to reuse
- * @return a FlinkRelMetadataQuery instance
- */
- def reuseOrCreate(mq: RelMetadataQuery): FlinkRelMetadataQuery = {
- mq match {
- case q: FlinkRelMetadataQuery => q
- case _ => FlinkRelMetadataQuery.instance()
- }
- }
-}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
index e91a152..c2ce840 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
@@ -138,7 +138,7 @@ final class TestingAppendBaseRowSink(
this(rowTypeInfo, TimeZone.getTimeZone("UTC"))
}
- def invoke(value: BaseRow): Unit = localResults +=
+ override def invoke(value: BaseRow): Unit = localResults +=
BaseRowTestUtil.baseRowToString(value, rowTypeInfo, tz)
def getAppendResults: List[String] = getResults
@@ -150,7 +150,7 @@ final class TestingAppendSink(tz: TimeZone) extends AbstractExactlyOnceSink[Row]
this(TimeZone.getTimeZone("UTC"))
}
- def invoke(value: Row): Unit = localResults += TestSinkUtil.rowToString(value, tz)
+ override def invoke(value: Row): Unit = localResults += TestSinkUtil.rowToString(value, tz)
def getAppendResults: List[String] = getResults
}
@@ -210,7 +210,7 @@ final class TestingUpsertSink(keys: Array[Int], tz: TimeZone)
}
}
- def invoke(d: (Boolean, BaseRow)): Unit = {
+ override def invoke(d: (Boolean, BaseRow)): Unit = {
this.synchronized {
val wrapRow = new GenericRow(2)
wrapRow.setField(0, d._1)
@@ -438,7 +438,7 @@ class TestingRetractSink(tz: TimeZone)
}
}
- def invoke(v: (Boolean, Row)): Unit = {
+ override def invoke(v: (Boolean, Row)): Unit = {
this.synchronized {
val tupleString = "(" + v._1.toString + "," + TestSinkUtil.rowToString(v._2, tz) + ")"
localResults += tupleString