You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/02/15 22:54:42 UTC

[2/2] flink git commit: [FLINK-5567] [table] Introduce and migrate current table statistics to FlinkStatistic.

[FLINK-5567] [table] Introduce and migrate current table statistics to FlinkStatistic.

This closes #3197.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae0fbff7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae0fbff7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae0fbff7

Branch: refs/heads/master
Commit: ae0fbff76f327c008bdbf02cac0067bab507a04f
Parents: d6a97e4
Author: \u69ff\u745c <ji...@alibaba-inc.com>
Authored: Tue Jan 24 14:57:08 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Feb 15 23:53:48 2017 +0100

----------------------------------------------------------------------
 .../flink/table/plan/schema/DataSetTable.scala  | 29 +------
 .../table/plan/schema/DataStreamTable.scala     |  6 +-
 .../flink/table/plan/schema/FlinkTable.scala    | 14 +++-
 .../table/plan/schema/TableSourceTable.scala    |  8 +-
 .../flink/table/plan/stats/FlinkStatistic.scala | 87 ++++++++++++++++++++
 5 files changed, 113 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ae0fbff7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala
index f8c6835..0ce2a87 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala
@@ -18,34 +18,13 @@
 
 package org.apache.flink.table.plan.schema
 
-import java.lang.Double
-import java.util
-import java.util.Collections
-
-import org.apache.calcite.rel.{RelCollation, RelDistribution}
-import org.apache.calcite.schema.Statistic
-import org.apache.calcite.util.ImmutableBitSet
 import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.plan.stats.{FlinkStatistic, TableStats}
 
 class DataSetTable[T](
     val dataSet: DataSet[T],
     override val fieldIndexes: Array[Int],
-    override val fieldNames: Array[String])
-  extends FlinkTable[T](dataSet.getType, fieldIndexes, fieldNames) {
-
-  override def getStatistic: Statistic = {
-    new DefaultDataSetStatistic
-  }
-
-}
-
-class DefaultDataSetStatistic extends Statistic {
-
-  override def getRowCount: Double = 1000d
-
-  override def getCollations: util.List[RelCollation] = Collections.emptyList()
-
-  override def isKey(columns: ImmutableBitSet): Boolean = false
-
-  override def getDistribution: RelDistribution = null
+    override val fieldNames: Array[String],
+    override val statistic: FlinkStatistic = FlinkStatistic.of(TableStats(1000L)))
+  extends FlinkTable[T](dataSet.getType, fieldIndexes, fieldNames, statistic) {
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ae0fbff7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
index 0355fac..6ce6570 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
@@ -19,10 +19,12 @@
 package org.apache.flink.table.plan.schema
 
 import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.plan.stats.FlinkStatistic
 
 class DataStreamTable[T](
     val dataStream: DataStream[T],
     override val fieldIndexes: Array[Int],
-    override val fieldNames: Array[String])
-  extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) {
+    override val fieldNames: Array[String],
+    override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
+  extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames, statistic) {
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ae0fbff7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
index 971f54f..ea77061 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
@@ -19,16 +19,19 @@
 package org.apache.flink.table.plan.schema
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Statistic
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.stats.FlinkStatistic
 
 abstract class FlinkTable[T](
     val typeInfo: TypeInformation[T],
     val fieldIndexes: Array[Int],
-    val fieldNames: Array[String])
+    val fieldNames: Array[String],
+    val statistic: FlinkStatistic)
   extends AbstractTable {
 
   if (fieldIndexes.length != fieldNames.length) {
@@ -64,4 +67,11 @@ abstract class FlinkTable[T](
     flinkTypeFactory.buildRowDataType(fieldNames, fieldTypes)
   }
 
+  /**
+    * Returns statistics of current table
+    *
+    * @return statistics of current table
+    */
+  override def getStatistic: Statistic = statistic
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ae0fbff7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
index 4f82f5e..a3851e3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -19,11 +19,15 @@
 package org.apache.flink.table.plan.schema
 
 import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.plan.stats.FlinkStatistic
 import org.apache.flink.table.sources.TableSource
 
 /** Table which defines an external table via a [[TableSource]] */
-class TableSourceTable[T](val tableSource: TableSource[T])
+class TableSourceTable[T](
+    val tableSource: TableSource[T],
+    override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
   extends FlinkTable[T](
     typeInfo = tableSource.getReturnType,
     fieldIndexes = TableEnvironment.getFieldIndices(tableSource),
-    fieldNames = TableEnvironment.getFieldNames(tableSource))
+    fieldNames = TableEnvironment.getFieldNames(tableSource),
+    statistic)

http://git-wip-us.apache.org/repos/asf/flink/blob/ae0fbff7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
new file mode 100644
index 0000000..6f4ea00
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.stats
+
+import java.lang.Double
+import java.util.{Collections, List}
+
+import org.apache.calcite.rel.{RelCollation, RelDistribution}
+import org.apache.calcite.schema.Statistic
+import org.apache.calcite.util.ImmutableBitSet
+
+/**
+  * The class provides statistics for a [[org.apache.flink.table.plan.schema.FlinkTable]].
+  *
+  * @param tableStats The table statistics.
+  */
+class FlinkStatistic(tableStats: Option[TableStats]) extends Statistic {
+
+  /**
+    * Returns the table statistics.
+    *
+    * @return The table statistics
+    */
+  def getTableStats: TableStats = tableStats.getOrElse(null)
+
+  /**
+    * Returns the stats of the specified the column.
+    *
+    * @param columnName The name of the column for which the stats are requested.
+    * @return The stats of the specified column.
+    */
+  def getColumnStats(columnName: String): ColumnStats = tableStats match {
+    case Some(tStats) => tStats.colStats.get(columnName)
+    case None => null
+  }
+
+  /**
+    * Returns the number of rows of the table.
+    *
+    * @return The number of rows of the table.
+    */
+  override def getRowCount: Double = tableStats match {
+    case Some(tStats) => tStats.rowCount.toDouble
+    case None => null
+  }
+
+  override def getCollations: List[RelCollation] = Collections.emptyList()
+
+  override def isKey(columns: ImmutableBitSet): Boolean = false
+
+  override def getDistribution: RelDistribution = null
+
+}
+
+/**
+  * Methods to create FlinkStatistic.
+  */
+object FlinkStatistic {
+
+  /** Represents a FlinkStatistic that knows nothing about a table */
+  val UNKNOWN: FlinkStatistic = new FlinkStatistic(None)
+
+  /**
+    * Returns a FlinkStatistic with given table statistics.
+    *
+    * @param tableStats The table statistics.
+    * @return The generated FlinkStatistic
+    */
+  def of(tableStats: TableStats): FlinkStatistic = new FlinkStatistic(Some(tableStats))
+
+}