You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:09 UTC
[40/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
deleted file mode 100644
index 6d00ab3..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.sql2rel.RelDecorrelator
-import org.apache.calcite.tools.{Programs, RuleSet}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.io.DiscardingOutputFormat
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.api.table.explain.PlanJsonParser
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel}
-import org.apache.flink.api.table.plan.rules.FlinkRuleSets
-import org.apache.flink.api.table.plan.schema.{DataSetTable, TableSourceTable}
-import org.apache.flink.api.table.sinks.{BatchTableSink, TableSink}
-import org.apache.flink.api.table.sources.BatchTableSource
-import org.apache.flink.types.Row
-
-/**
- * The abstract base class for batch TableEnvironments.
- *
- * A TableEnvironment can be used to:
- * - convert a [[DataSet]] to a [[Table]]
- * - register a [[DataSet]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
- * - register a [[Table]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
- * - scan a registered table to obtain a [[Table]]
- * - specify a SQL query on registered tables to obtain a [[Table]]
- * - convert a [[Table]] into a [[DataSet]]
- * - explain the AST and execution plan of a [[Table]]
- *
- * @param execEnv The [[ExecutionEnvironment]] which is wrapped in this [[BatchTableEnvironment]].
- * @param config The [[TableConfig]] of this [[BatchTableEnvironment]].
- */
-abstract class BatchTableEnvironment(
- private[flink] val execEnv: ExecutionEnvironment,
- config: TableConfig)
- extends TableEnvironment(config) {
-
- // a counter for unique table names.
- private val nameCntr: AtomicInteger = new AtomicInteger(0)
-
- // the naming pattern for internally registered tables.
- private val internalNamePattern = "^_DataSetTable_[0-9]+$".r
-
- /**
- * Checks if the chosen table name is valid.
- *
- * @param name The table name to check.
- */
- override protected def checkValidTableName(name: String): Unit = {
- val m = internalNamePattern.findFirstIn(name)
- m match {
- case Some(_) =>
- throw new TableException(s"Illegal Table name. " +
- s"Please choose a name that does not contain the pattern $internalNamePattern")
- case None =>
- }
- }
-
- /** Returns a unique table name according to the internal naming pattern. */
- protected def createUniqueTableName(): String = "_DataSetTable_" + nameCntr.getAndIncrement()
-
- /**
- * Scans a registered table and returns the resulting [[Table]].
- *
- * The table to scan must be registered in the [[TableEnvironment]]'s catalog.
- *
- * @param tableName The name of the table to scan.
- * @throws ValidationException if no table is registered under the given name.
- * @return The scanned table.
- */
- @throws[ValidationException]
- def scan(tableName: String): Table = {
- if (isRegistered(tableName)) {
- new Table(this, CatalogNode(tableName, getRowType(tableName)))
- } else {
- throw new TableException(s"Table \'$tableName\' was not found in the registry.")
- }
- }
-
- /**
- * Registers an external [[BatchTableSource]] in this [[TableEnvironment]]'s catalog.
- * Registered tables can be referenced in SQL queries.
- *
- * @param name The name under which the [[BatchTableSource]] is registered.
- * @param tableSource The [[BatchTableSource]] to register.
- */
- def registerTableSource(name: String, tableSource: BatchTableSource[_]): Unit = {
-
- checkValidTableName(name)
- registerTableInternal(name, new TableSourceTable(tableSource))
- }
-
- /**
- * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
- *
- * All tables referenced by the query must be registered in the TableEnvironment.
- *
- * @param query The SQL query to evaluate.
- * @return The result of the query as Table.
- */
- override def sql(query: String): Table = {
-
- val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
- // parse the sql query
- val parsed = planner.parse(query)
- // validate the sql query
- val validated = planner.validate(parsed)
- // transform to a relational tree
- val relational = planner.rel(validated)
-
- new Table(this, LogicalRelNode(relational.rel))
- }
-
- /**
- * Writes a [[Table]] to a [[TableSink]].
- *
- * Internally, the [[Table]] is translated into a [[DataSet]] and handed over to the
- * [[TableSink]] to write it.
- *
- * @param table The [[Table]] to write.
- * @param sink The [[TableSink]] to write the [[Table]] to.
- * @tparam T The expected type of the [[DataSet]] which represents the [[Table]].
- */
- override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = {
-
- sink match {
- case batchSink: BatchTableSink[T] =>
- val outputType = sink.getOutputType
- // translate the Table into a DataSet and provide the type that the TableSink expects.
- val result: DataSet[T] = translate(table)(outputType)
- // Give the DataSet to the TableSink to emit it.
- batchSink.emitDataSet(result)
- case _ =>
- throw new TableException("BatchTableSink required to emit batch Table")
- }
- }
-
- /**
- * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
- * the result of the given [[Table]].
- *
- * @param table The table for which the AST and execution plan will be returned.
- * @param extended Flag to include detailed optimizer estimates.
- */
- private[flink] def explain(table: Table, extended: Boolean): String = {
- val ast = table.getRelNode
- val optimizedPlan = optimize(ast)
- val dataSet = translate[Row](optimizedPlan) (TypeExtractor.createTypeInfo(classOf[Row]))
- dataSet.output(new DiscardingOutputFormat[Row])
- val env = dataSet.getExecutionEnvironment
- val jasonSqlPlan = env.getExecutionPlan
- val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
-
- s"== Abstract Syntax Tree ==" +
- System.lineSeparator +
- s"${RelOptUtil.toString(ast)}" +
- System.lineSeparator +
- s"== Optimized Logical Plan ==" +
- System.lineSeparator +
- s"${RelOptUtil.toString(optimizedPlan)}" +
- System.lineSeparator +
- s"== Physical Execution Plan ==" +
- System.lineSeparator +
- s"$sqlPlan"
- }
-
- /**
- * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
- * the result of the given [[Table]].
- *
- * @param table The table for which the AST and execution plan will be returned.
- */
- def explain(table: Table): String = explain(table: Table, extended = false)
-
- /**
- * Registers a [[DataSet]] as a table under a given name in the [[TableEnvironment]]'s catalog.
- *
- * @param name The name under which the table is registered in the catalog.
- * @param dataSet The [[DataSet]] to register as table in the catalog.
- * @tparam T the type of the [[DataSet]].
- */
- protected def registerDataSetInternal[T](name: String, dataSet: DataSet[T]): Unit = {
-
- val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType)
- val dataSetTable = new DataSetTable[T](
- dataSet,
- fieldIndexes,
- fieldNames
- )
- registerTableInternal(name, dataSetTable)
- }
-
- /**
- * Registers a [[DataSet]] as a table under a given name with field names as specified by
- * field expressions in the [[TableEnvironment]]'s catalog.
- *
- * @param name The name under which the table is registered in the catalog.
- * @param dataSet The [[DataSet]] to register as table in the catalog.
- * @param fields The field expressions to define the field names of the table.
- * @tparam T The type of the [[DataSet]].
- */
- protected def registerDataSetInternal[T](
- name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = {
-
- val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType, fields)
- val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames)
- registerTableInternal(name, dataSetTable)
- }
-
- /**
- * Returns the built-in rules that are defined by the environment.
- */
- protected def getBuiltInRuleSet: RuleSet = FlinkRuleSets.DATASET_OPT_RULES
-
- /**
- * Generates the optimized [[RelNode]] tree from the original relational node tree.
- *
- * @param relNode The original [[RelNode]] tree
- * @return The optimized [[RelNode]] tree
- */
- private[flink] def optimize(relNode: RelNode): RelNode = {
-
- // decorrelate
- val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
-
- // optimize the logical Flink plan
- val optProgram = Programs.ofRules(getRuleSet)
- val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
-
- val dataSetPlan = try {
- optProgram.run(getPlanner, decorPlan, flinkOutputProps)
- } catch {
- case e: CannotPlanException =>
- throw new TableException(
- s"Cannot generate a valid execution plan for the given query: \n\n" +
- s"${RelOptUtil.toString(relNode)}\n" +
- s"This exception indicates that the query uses an unsupported SQL feature.\n" +
- s"Please check the documentation for the set of currently supported SQL features.")
- case t: TableException =>
- throw new TableException(
- s"Cannot generate a valid execution plan for the given query: \n\n" +
- s"${RelOptUtil.toString(relNode)}\n" +
- s"${t.msg}\n" +
- s"Please check the documentation for the set of currently supported SQL features.")
- case a: AssertionError =>
- throw a.getCause
- }
- dataSetPlan
- }
-
- /**
- * Translates a [[Table]] into a [[DataSet]].
- *
- * The transformation involves optimizing the relational expression tree as defined by
- * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
- *
- * @param table The root node of the relational expression tree.
- * @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
- * @tparam A The type of the resulting [[DataSet]].
- * @return The [[DataSet]] that corresponds to the translated [[Table]].
- */
- protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
- val dataSetPlan = optimize(table.getRelNode)
- translate(dataSetPlan)
- }
-
- /**
- * Translates a logical [[RelNode]] into a [[DataSet]].
- *
- * @param logicalPlan The root node of the relational expression tree.
- * @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
- * @tparam A The type of the resulting [[DataSet]].
- * @return The [[DataSet]] that corresponds to the translated [[Table]].
- */
- protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = {
- validateType(tpe)
-
- logicalPlan match {
- case node: DataSetRel =>
- node.translateToPlan(
- this,
- Some(tpe.asInstanceOf[TypeInformation[Any]])
- ).asInstanceOf[DataSet[A]]
- case _ => ???
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala
deleted file mode 100644
index 06b3edc..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import org.apache.calcite.plan.RelOptRule
-import org.apache.calcite.sql.SqlOperatorTable
-import org.apache.calcite.sql.parser.SqlParser
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable
-import org.apache.calcite.tools.{RuleSets, RuleSet}
-import org.apache.flink.util.Preconditions
-
-import scala.collection.JavaConverters._
-
-/**
- * Builder for creating a Calcite configuration.
- */
-class CalciteConfigBuilder {
- private var replaceRules: Boolean = false
- private var ruleSets: List[RuleSet] = Nil
-
- private var replaceOperatorTable: Boolean = false
- private var operatorTables: List[SqlOperatorTable] = Nil
-
- private var replaceSqlParserConfig: Option[SqlParser.Config] = None
-
- /**
- * Replaces the built-in rule set with the given rule set.
- */
- def replaceRuleSet(replaceRuleSet: RuleSet): CalciteConfigBuilder = {
- Preconditions.checkNotNull(replaceRuleSet)
- ruleSets = List(replaceRuleSet)
- replaceRules = true
- this
- }
-
- /**
- * Appends the given rule set to the built-in rule set.
- */
- def addRuleSet(addedRuleSet: RuleSet): CalciteConfigBuilder = {
- Preconditions.checkNotNull(addedRuleSet)
- ruleSets = addedRuleSet :: ruleSets
- this
- }
-
- /**
- * Replaces the built-in SQL operator table with the given table.
- */
- def replaceSqlOperatorTable(replaceSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = {
- Preconditions.checkNotNull(replaceSqlOperatorTable)
- operatorTables = List(replaceSqlOperatorTable)
- replaceOperatorTable = true
- this
- }
-
- /**
- * Appends the given table to the built-in SQL operator table.
- */
- def addSqlOperatorTable(addedSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = {
- Preconditions.checkNotNull(addedSqlOperatorTable)
- this.operatorTables = addedSqlOperatorTable :: this.operatorTables
- this
- }
-
- /**
- * Replaces the built-in SQL parser configuration with the given configuration.
- */
- def replaceSqlParserConfig(sqlParserConfig: SqlParser.Config): CalciteConfigBuilder = {
- Preconditions.checkNotNull(sqlParserConfig)
- replaceSqlParserConfig = Some(sqlParserConfig)
- this
- }
-
- private class CalciteConfigImpl(
- val getRuleSet: Option[RuleSet],
- val replacesRuleSet: Boolean,
- val getSqlOperatorTable: Option[SqlOperatorTable],
- val replacesSqlOperatorTable: Boolean,
- val getSqlParserConfig: Option[SqlParser.Config])
- extends CalciteConfig
-
- /**
- * Builds a new [[CalciteConfig]].
- */
- def build(): CalciteConfig = new CalciteConfigImpl(
- ruleSets match {
- case Nil => None
- case h :: Nil => Some(h)
- case _ =>
- // concat rule sets
- val concatRules = ruleSets.foldLeft(Nil: Iterable[RelOptRule])( (c, r) => r.asScala ++ c)
- Some(RuleSets.ofList(concatRules.asJava))
- },
- this.replaceRules,
- operatorTables match {
- case Nil => None
- case h :: Nil => Some(h)
- case _ =>
- // chain operator tables
- Some(operatorTables.reduce( (x, y) => ChainedSqlOperatorTable.of(x, y)))
- },
- this.replaceOperatorTable,
- replaceSqlParserConfig)
-}
-
-/**
- * Calcite configuration for defining a custom Calcite configuration for Table and SQL API.
- */
-trait CalciteConfig {
- /**
- * Returns whether this configuration replaces the built-in rule set.
- */
- def replacesRuleSet: Boolean
-
- /**
- * Returns a custom rule set.
- */
- def getRuleSet: Option[RuleSet]
-
- /**
- * Returns whether this configuration replaces the built-in SQL operator table.
- */
- def replacesSqlOperatorTable: Boolean
-
- /**
- * Returns a custom SQL operator table.
- */
- def getSqlOperatorTable: Option[SqlOperatorTable]
-
- /**
- * Returns a custom SQL parser configuration.
- */
- def getSqlParserConfig: Option[SqlParser.Config]
-}
-
-object CalciteConfig {
-
- val DEFAULT = createBuilder().build()
-
- /**
- * Creates a new builder for constructing a [[CalciteConfig]].
- */
- def createBuilder(): CalciteConfigBuilder = {
- new CalciteConfigBuilder
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala
deleted file mode 100644
index b1ccc09..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import org.apache.calcite.adapter.java.JavaTypeFactory
-import org.apache.calcite.prepare.CalciteCatalogReader
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.sql.{SqlInsert, SqlOperatorTable}
-import org.apache.calcite.sql.validate.{SqlValidatorImpl, SqlConformance}
-
-/**
- * This is a copy of Calcite's CalciteSqlValidator to use with [[FlinkPlannerImpl]].
- */
-class FlinkCalciteSqlValidator(
- opTab: SqlOperatorTable,
- catalogReader: CalciteCatalogReader,
- typeFactory: JavaTypeFactory) extends SqlValidatorImpl(
- opTab, catalogReader, typeFactory, SqlConformance.DEFAULT) {
-
- override def getLogicalSourceRowType(
- sourceRowType: RelDataType,
- insert: SqlInsert): RelDataType = {
- typeFactory.asInstanceOf[JavaTypeFactory].toSql(sourceRowType)
- }
-
- override def getLogicalTargetRowType(
- targetRowType: RelDataType,
- insert: SqlInsert): RelDataType = {
- typeFactory.asInstanceOf[JavaTypeFactory].toSql(targetRowType)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
deleted file mode 100644
index 131cdc6..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import java.util
-
-import com.google.common.collect.ImmutableList
-import org.apache.calcite.jdbc.CalciteSchema
-import org.apache.calcite.plan.RelOptTable.ViewExpander
-import org.apache.calcite.plan._
-import org.apache.calcite.prepare.CalciteCatalogReader
-import org.apache.calcite.rel.RelRoot
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.RexBuilder
-import org.apache.calcite.schema.SchemaPlus
-import org.apache.calcite.sql.parser.{SqlParseException => CSqlParseException, SqlParser}
-import org.apache.calcite.sql.validate.SqlValidator
-import org.apache.calcite.sql.{SqlNode, SqlOperatorTable}
-import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, SqlToRelConverter}
-import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
-
-import scala.collection.JavaConversions._
-
-/**
- * NOTE: this is heavily inspired by Calcite's PlannerImpl.
- * We need it in order to share the planner between the Table API relational plans
- * and the SQL relation plans that are created by the Calcite parser.
- * The main difference is that we do not create a new RelOptPlanner in the ready() method.
- */
-class FlinkPlannerImpl(
- config: FrameworkConfig,
- planner: RelOptPlanner,
- typeFactory: FlinkTypeFactory) {
-
- val operatorTable: SqlOperatorTable = config.getOperatorTable
- /** Holds the trait definitions to be registered with planner. May be null. */
- val traitDefs: ImmutableList[RelTraitDef[_ <: RelTrait]] = config.getTraitDefs
- val parserConfig: SqlParser.Config = config.getParserConfig
- val convertletTable: SqlRexConvertletTable = config.getConvertletTable
- val defaultSchema: SchemaPlus = config.getDefaultSchema
-
- var validator: FlinkCalciteSqlValidator = _
- var validatedSqlNode: SqlNode = _
- var root: RelRoot = _
-
- private def ready() {
- if (this.traitDefs != null) {
- planner.clearRelTraitDefs()
- for (traitDef <- this.traitDefs) {
- planner.addRelTraitDef(traitDef)
- }
- }
- }
-
- def parse(sql: String): SqlNode = {
- try {
- ready()
- val parser: SqlParser = SqlParser.create(sql, parserConfig)
- val sqlNode: SqlNode = parser.parseStmt
- sqlNode
- } catch {
- case e: CSqlParseException =>
- throw SqlParserException(s"SQL parse failed. ${e.getMessage}", e)
- }
- }
-
- def validate(sqlNode: SqlNode): SqlNode = {
- validator = new FlinkCalciteSqlValidator(operatorTable, createCatalogReader, typeFactory)
- validator.setIdentifierExpansion(true)
- try {
- validatedSqlNode = validator.validate(sqlNode)
- }
- catch {
- case e: RuntimeException =>
- throw new ValidationException(s"SQL validation failed. ${e.getMessage}", e)
- }
- validatedSqlNode
- }
-
- def rel(sql: SqlNode): RelRoot = {
- try {
- assert(validatedSqlNode != null)
- val rexBuilder: RexBuilder = createRexBuilder
- val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
- val config = SqlToRelConverter.configBuilder()
- .withTrimUnusedFields(false).withConvertTableAccess(false).build()
- val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
- new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable, config)
- root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true)
- // we disable automatic flattening in order to let composite types pass without modification
- // we might enable it again once Calcite has better support for structured types
- // root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
- root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
- root
- } catch {
- case e: RelConversionException => throw TableException(e.getMessage)
- }
- }
-
- /** Implements [[org.apache.calcite.plan.RelOptTable.ViewExpander]]
- * interface for [[org.apache.calcite.tools.Planner]]. */
- class ViewExpanderImpl extends ViewExpander {
-
- override def expandView(
- rowType: RelDataType,
- queryString: String,
- schemaPath: util.List[String],
- viewPath: util.List[String]): RelRoot = {
-
- val parser: SqlParser = SqlParser.create(queryString, parserConfig)
- var sqlNode: SqlNode = null
- try {
- sqlNode = parser.parseQuery
- }
- catch {
- case e: CSqlParseException =>
- throw SqlParserException(s"SQL parse failed. ${e.getMessage}", e)
- }
- val catalogReader: CalciteCatalogReader = createCatalogReader.withSchemaPath(schemaPath)
- val validator: SqlValidator =
- new FlinkCalciteSqlValidator(operatorTable, catalogReader, typeFactory)
- validator.setIdentifierExpansion(true)
- val validatedSqlNode: SqlNode = validator.validate(sqlNode)
- val rexBuilder: RexBuilder = createRexBuilder
- val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
- val config: SqlToRelConverter.Config = SqlToRelConverter.configBuilder
- .withTrimUnusedFields(false).withConvertTableAccess(false).build
- val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
- new ViewExpanderImpl, validator, catalogReader, cluster, convertletTable, config)
- root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false)
- root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
- root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
- FlinkPlannerImpl.this.root
- }
- }
-
- private def createCatalogReader: CalciteCatalogReader = {
- val rootSchema: SchemaPlus = FlinkPlannerImpl.rootSchema(defaultSchema)
- new CalciteCatalogReader(
- CalciteSchema.from(rootSchema),
- parserConfig.caseSensitive,
- CalciteSchema.from(defaultSchema).path(null),
- typeFactory)
- }
-
- private def createRexBuilder: RexBuilder = {
- new RexBuilder(typeFactory)
- }
-
-}
-
-object FlinkPlannerImpl {
- private def rootSchema(schema: SchemaPlus): SchemaPlus = {
- if (schema.getParentSchema == null) {
- schema
- }
- else {
- rootSchema(schema.getParentSchema)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
deleted file mode 100644
index 8508e53..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import java.util.Collections
-
-import org.apache.calcite.plan.volcano.VolcanoPlanner
-import java.lang.Iterable
-
-import org.apache.calcite.jdbc.CalciteSchema
-import org.apache.calcite.plan._
-import org.apache.calcite.prepare.CalciteCatalogReader
-import org.apache.calcite.rel.logical.LogicalAggregate
-import org.apache.calcite.rex.RexBuilder
-import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey}
-import org.apache.calcite.tools.{FrameworkConfig, RelBuilder}
-import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.api.table.expressions.WindowProperty
-import org.apache.flink.api.table.plan.logical.LogicalWindow
-import org.apache.flink.api.table.plan.logical.rel.LogicalWindowAggregate
-
-/**
- * Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]].
- */
-class FlinkRelBuilder(
- context: Context,
- relOptCluster: RelOptCluster,
- relOptSchema: RelOptSchema)
- extends RelBuilder(
- context,
- relOptCluster,
- relOptSchema) {
-
- def getPlanner: RelOptPlanner = cluster.getPlanner
-
- def getCluster: RelOptCluster = relOptCluster
-
- override def getTypeFactory: FlinkTypeFactory =
- super.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-
- def aggregate(
- window: LogicalWindow,
- groupKey: GroupKey,
- namedProperties: Seq[NamedWindowProperty],
- aggCalls: Iterable[AggCall])
- : RelBuilder = {
- // build logical aggregate
- val aggregate = super.aggregate(groupKey, aggCalls).build().asInstanceOf[LogicalAggregate]
-
- // build logical window aggregate from it
- push(LogicalWindowAggregate.create(window, namedProperties, aggregate))
- this
- }
-
-}
-
-object FlinkRelBuilder {
-
- def create(config: FrameworkConfig): FlinkRelBuilder = {
-
- // create Flink type factory
- val typeSystem = config.getTypeSystem
- val typeFactory = new FlinkTypeFactory(typeSystem)
-
- // create context instances with Flink type factory
- val planner = new VolcanoPlanner(config.getCostFactory, Contexts.empty())
- planner.setExecutor(config.getExecutor)
- planner.addRelTraitDef(ConventionTraitDef.INSTANCE)
- val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory))
- val calciteSchema = CalciteSchema.from(config.getDefaultSchema)
- val relOptSchema = new CalciteCatalogReader(
- calciteSchema,
- config.getParserConfig.caseSensitive(),
- Collections.emptyList(),
- typeFactory)
-
- new FlinkRelBuilder(config.getContext, cluster, relOptSchema)
- }
-
- /**
- * Information necessary to create a window aggregate.
- *
- * Similar to [[RelBuilder.AggCall]] or [[RelBuilder.GroupKey]].
- */
- case class NamedWindowProperty(name: String, property: WindowProperty)
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
deleted file mode 100644
index 8dcd660..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import org.apache.calcite.avatica.util.TimeUnit
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
-import org.apache.calcite.sql.SqlIntervalQualifier
-import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.parser.SqlParserPos
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
-import org.apache.flink.api.java.typeutils.ValueTypeInfo._
-import org.apache.flink.api.table.FlinkTypeFactory.typeInfoToSqlTypeName
-import org.apache.flink.api.table.plan.schema.{ArrayRelDataType, CompositeRelDataType, GenericRelDataType}
-import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo
-import org.apache.flink.api.table.typeutils.TypeCheckUtils.isSimple
-
-import scala.collection.mutable
-
-/**
- * Flink specific type factory that represents the interface between Flink's [[TypeInformation]]
- * and Calcite's [[RelDataType]].
- */
-class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) {
-
- // NOTE: for future data types it might be necessary to
- // override more methods of RelDataTypeFactoryImpl
-
- private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]()
-
- def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = {
- // simple type can be converted to SQL types and vice versa
- if (isSimple(typeInfo)) {
- val sqlType = typeInfoToSqlTypeName(typeInfo)
- sqlType match {
-
- case INTERVAL_YEAR_MONTH =>
- createSqlIntervalType(
- new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
-
- case INTERVAL_DAY_SECOND =>
- createSqlIntervalType(
- new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
-
- case _ =>
- createSqlType(sqlType)
- }
- }
- // advanced types require specific RelDataType
- // for storing the original TypeInformation
- else {
- seenTypes.getOrElseUpdate(typeInfo, canonize(createAdvancedType(typeInfo)))
- }
- }
-
- /**
- * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory
- *
- * @param fieldNames field names
- * @param fieldTypes field types, every element is Flink's [[TypeInformation]]
- * @return a struct type with the input fieldNames and input fieldTypes
- */
- def buildRowDataType(
- fieldNames: Array[String],
- fieldTypes: Array[TypeInformation[_]])
- : RelDataType = {
- val rowDataTypeBuilder = builder
- fieldNames
- .zip(fieldTypes)
- .foreach { f =>
- rowDataTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2)).nullable(true)
- }
- rowDataTypeBuilder.build
- }
-
- override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = {
- // it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue
- // always set those to default value
- if (typeName == VARCHAR && precision < 0) {
- createSqlType(typeName, getTypeSystem.getDefaultPrecision(typeName))
- } else {
- super.createSqlType(typeName, precision)
- }
- }
-
- override def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType =
- new ArrayRelDataType(
- ObjectArrayTypeInfo.getInfoFor(FlinkTypeFactory.toTypeInfo(elementType)),
- elementType,
- true)
-
- private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match {
- case ct: CompositeType[_] =>
- new CompositeRelDataType(ct, this)
-
- case pa: PrimitiveArrayTypeInfo[_] =>
- new ArrayRelDataType(pa, createTypeFromTypeInfo(pa.getComponentType), false)
-
- case oa: ObjectArrayTypeInfo[_, _] =>
- new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true)
-
- case ti: TypeInformation[_] =>
- new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem])
-
- case ti@_ =>
- throw TableException(s"Unsupported type information: $ti")
- }
-
- override def createTypeWithNullability(
- relDataType: RelDataType,
- nullable: Boolean)
- : RelDataType = relDataType match {
- case composite: CompositeRelDataType =>
- // at the moment we do not care about nullability
- composite
- case _ =>
- super.createTypeWithNullability(relDataType, nullable)
- }
-}
-
-object FlinkTypeFactory {
-
- private def typeInfoToSqlTypeName(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match {
- case BOOLEAN_TYPE_INFO => BOOLEAN
- case BYTE_TYPE_INFO => TINYINT
- case SHORT_TYPE_INFO => SMALLINT
- case INT_TYPE_INFO => INTEGER
- case LONG_TYPE_INFO => BIGINT
- case FLOAT_TYPE_INFO => FLOAT
- case DOUBLE_TYPE_INFO => DOUBLE
- case STRING_TYPE_INFO => VARCHAR
- case BIG_DEC_TYPE_INFO => DECIMAL
-
- // temporal types
- case SqlTimeTypeInfo.DATE => DATE
- case SqlTimeTypeInfo.TIME => TIME
- case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP
- case TimeIntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH
- case TimeIntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_SECOND
-
- case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO =>
- throw TableException("Character type is not supported.")
-
- case _@t =>
- throw TableException(s"Type is not supported: $t")
- }
-
- def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match {
- case BOOLEAN => BOOLEAN_TYPE_INFO
- case TINYINT => BYTE_TYPE_INFO
- case SMALLINT => SHORT_TYPE_INFO
- case INTEGER => INT_TYPE_INFO
- case BIGINT => LONG_TYPE_INFO
- case FLOAT => FLOAT_TYPE_INFO
- case DOUBLE => DOUBLE_TYPE_INFO
- case VARCHAR | CHAR => STRING_TYPE_INFO
- case DECIMAL => BIG_DEC_TYPE_INFO
-
- // temporal types
- case DATE => SqlTimeTypeInfo.DATE
- case TIME => SqlTimeTypeInfo.TIME
- case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP
- case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => TimeIntervalTypeInfo.INTERVAL_MONTHS
- case typeName if DAY_INTERVAL_TYPES.contains(typeName) => TimeIntervalTypeInfo.INTERVAL_MILLIS
-
- case NULL =>
- throw TableException("Type NULL is not supported. Null values must have a supported type.")
-
- // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
- // are represented as integer
- case SYMBOL => INT_TYPE_INFO
-
- // extract encapsulated TypeInformation
- case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
- val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
- genericRelDataType.typeInfo
-
- case ROW if relDataType.isInstanceOf[CompositeRelDataType] =>
- val compositeRelDataType = relDataType.asInstanceOf[CompositeRelDataType]
- compositeRelDataType.compositeType
-
- // ROW and CURSOR for UDTF case, whose type info will never be used, just a placeholder
- case ROW | CURSOR => new NothingTypeInfo
-
- case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
- val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
- arrayRelDataType.typeInfo
-
- case _@t =>
- throw TableException(s"Type is not supported: $t")
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
deleted file mode 100644
index 3222eee..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import org.apache.calcite.rel.`type`.RelDataTypeSystemImpl
-import org.apache.calcite.sql.`type`.SqlTypeName
-
-/**
- * Custom type system for Flink.
- */
-class FlinkTypeSystem extends RelDataTypeSystemImpl {
-
- // we cannot use Int.MaxValue because of an overflow in Calcites type inference logic
- // half should be enough for all use cases
- override def getMaxNumericScale: Int = Int.MaxValue / 2
-
- // we cannot use Int.MaxValue because of an overflow in Calcites type inference logic
- // half should be enough for all use cases
- override def getMaxNumericPrecision: Int = Int.MaxValue / 2
-
- override def getDefaultPrecision(typeName: SqlTypeName): Int = typeName match {
-
- // by default all VARCHARs can have the Java default length
- case SqlTypeName.VARCHAR =>
- Int.MaxValue
-
- // we currenty support only timestamps with milliseconds precision
- case SqlTypeName.TIMESTAMP =>
- 3
-
- case _ =>
- super.getDefaultPrecision(typeName)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
deleted file mode 100644
index da20e07..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.sql2rel.RelDecorrelator
-import org.apache.calcite.tools.{Programs, RuleSet}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.table.explain.PlanJsonParser
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
-import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
-import org.apache.flink.api.table.plan.rules.FlinkRuleSets
-import org.apache.flink.api.table.plan.schema.{DataStreamTable, TableSourceTable}
-import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink}
-import org.apache.flink.api.table.sources.StreamTableSource
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.types.Row
-
-/**
- * The base class for stream TableEnvironments.
- *
- * A TableEnvironment can be used to:
- * - convert [[DataStream]] to a [[Table]]
- * - register a [[DataStream]] as a table in the catalog
- * - register a [[Table]] in the catalog
- * - scan a registered table to obtain a [[Table]]
- * - specify a SQL query on registered tables to obtain a [[Table]]
- * - convert a [[Table]] into a [[DataStream]]
- *
- * @param execEnv The [[StreamExecutionEnvironment]] which is wrapped in this
- * [[StreamTableEnvironment]].
- * @param config The [[TableConfig]] of this [[StreamTableEnvironment]].
- */
-abstract class StreamTableEnvironment(
- private[flink] val execEnv: StreamExecutionEnvironment,
- config: TableConfig)
- extends TableEnvironment(config) {
-
- // a counter for unique table names
- private val nameCntr: AtomicInteger = new AtomicInteger(0)
-
- // the naming pattern for internally registered tables.
- private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r
-
- /**
- * Checks if the chosen table name is valid.
- *
- * @param name The table name to check.
- */
- override protected def checkValidTableName(name: String): Unit = {
- val m = internalNamePattern.findFirstIn(name)
- m match {
- case Some(_) =>
- throw new TableException(s"Illegal Table name. " +
- s"Please choose a name that does not contain the pattern $internalNamePattern")
- case None =>
- }
- }
-
- /** Returns a unique table name according to the internal naming pattern. */
- protected def createUniqueTableName(): String = "_DataStreamTable_" + nameCntr.getAndIncrement()
-
- /**
- * Returns field names and field positions for a given [[TypeInformation]].
- *
- * Field names are automatically extracted for
- * [[org.apache.flink.api.common.typeutils.CompositeType]].
- * The method fails if inputType is not a
- * [[org.apache.flink.api.common.typeutils.CompositeType]].
- *
- * @param inputType The TypeInformation extract the field names and positions from.
- * @tparam A The type of the TypeInformation.
- * @return A tuple of two arrays holding the field names and corresponding field positions.
- */
- override protected[flink] def getFieldInfo[A](inputType: TypeInformation[A])
- : (Array[String], Array[Int]) = {
- val fieldInfo = super.getFieldInfo(inputType)
- if (fieldInfo._1.contains("rowtime")) {
- throw new TableException("'rowtime' ia a reserved field name in stream environment.")
- }
- fieldInfo
- }
-
- /**
- * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
- * [[Expression]].
- *
- * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
- * @param exprs The expressions that define the field names.
- * @tparam A The type of the TypeInformation.
- * @return A tuple of two arrays holding the field names and corresponding field positions.
- */
- override protected[flink] def getFieldInfo[A](
- inputType: TypeInformation[A],
- exprs: Array[Expression])
- : (Array[String], Array[Int]) = {
- val fieldInfo = super.getFieldInfo(inputType, exprs)
- if (fieldInfo._1.contains("rowtime")) {
- throw new TableException("'rowtime' is a reserved field name in stream environment.")
- }
- fieldInfo
- }
-
- /**
- * Ingests a registered table and returns the resulting [[Table]].
- *
- * The table to ingest must be registered in the [[TableEnvironment]]'s catalog.
- *
- * @param tableName The name of the table to ingest.
- * @throws ValidationException if no table is registered under the given name.
- * @return The ingested table.
- */
- @throws[ValidationException]
- def ingest(tableName: String): Table = {
-
- if (isRegistered(tableName)) {
- new Table(this, CatalogNode(tableName, getRowType(tableName)))
- }
- else {
- throw new ValidationException(s"Table \'$tableName\' was not found in the registry.")
- }
- }
-
- /**
- * Registers an external [[StreamTableSource]] in this [[TableEnvironment]]'s catalog.
- * Registered tables can be referenced in SQL queries.
- *
- * @param name The name under which the [[StreamTableSource]] is registered.
- * @param tableSource The [[org.apache.flink.api.table.sources.StreamTableSource]] to register.
- */
- def registerTableSource(name: String, tableSource: StreamTableSource[_]): Unit = {
-
- checkValidTableName(name)
- registerTableInternal(name, new TableSourceTable(tableSource))
- }
-
- /**
- * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
- *
- * All tables referenced by the query must be registered in the TableEnvironment.
- *
- * @param query The SQL query to evaluate.
- * @return The result of the query as Table.
- */
- override def sql(query: String): Table = {
-
- val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
- // parse the sql query
- val parsed = planner.parse(query)
- // validate the sql query
- val validated = planner.validate(parsed)
- // transform to a relational tree
- val relational = planner.rel(validated)
-
- new Table(this, LogicalRelNode(relational.rel))
- }
-
- /**
- * Writes a [[Table]] to a [[TableSink]].
- *
- * Internally, the [[Table]] is translated into a [[DataStream]] and handed over to the
- * [[TableSink]] to write it.
- *
- * @param table The [[Table]] to write.
- * @param sink The [[TableSink]] to write the [[Table]] to.
- * @tparam T The expected type of the [[DataStream]] which represents the [[Table]].
- */
- override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = {
-
- sink match {
- case streamSink: StreamTableSink[T] =>
- val outputType = sink.getOutputType
- // translate the Table into a DataStream and provide the type that the TableSink expects.
- val result: DataStream[T] = translate(table)(outputType)
- // Give the DataSet to the TableSink to emit it.
- streamSink.emitDataStream(result)
- case _ =>
- throw new TableException("StreamTableSink required to emit streaming Table")
- }
- }
-
- /**
- * Registers a [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s
- * catalog.
- *
- * @param name The name under which the table is registered in the catalog.
- * @param dataStream The [[DataStream]] to register as table in the catalog.
- * @tparam T the type of the [[DataStream]].
- */
- protected def registerDataStreamInternal[T](
- name: String,
- dataStream: DataStream[T]): Unit = {
-
- val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType)
- val dataStreamTable = new DataStreamTable[T](
- dataStream,
- fieldIndexes,
- fieldNames
- )
- registerTableInternal(name, dataStreamTable)
- }
-
- /**
- * Registers a [[DataStream]] as a table under a given name with field names as specified by
- * field expressions in the [[TableEnvironment]]'s catalog.
- *
- * @param name The name under which the table is registered in the catalog.
- * @param dataStream The [[DataStream]] to register as table in the catalog.
- * @param fields The field expressions to define the field names of the table.
- * @tparam T The type of the [[DataStream]].
- */
- protected def registerDataStreamInternal[T](
- name: String,
- dataStream: DataStream[T],
- fields: Array[Expression]): Unit = {
-
- val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields.toArray)
- val dataStreamTable = new DataStreamTable[T](
- dataStream,
- fieldIndexes.toArray,
- fieldNames.toArray
- )
- registerTableInternal(name, dataStreamTable)
- }
-
- /**
- * Returns the built-in rules that are defined by the environment.
- */
- protected def getBuiltInRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_OPT_RULES
-
- /**
- * Generates the optimized [[RelNode]] tree from the original relational node tree.
- *
- * @param relNode The root node of the relational expression tree.
- * @return The optimized [[RelNode]] tree
- */
- private[flink] def optimize(relNode: RelNode): RelNode = {
- // decorrelate
- val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
-
- // optimize the logical Flink plan
- val optProgram = Programs.ofRules(getRuleSet)
- val flinkOutputProps = relNode.getTraitSet.replace(DataStreamConvention.INSTANCE).simplify()
-
- val dataStreamPlan = try {
- optProgram.run(getPlanner, decorPlan, flinkOutputProps)
- }
- catch {
- case e: CannotPlanException =>
- throw TableException(
- s"Cannot generate a valid execution plan for the given query: \n\n" +
- s"${RelOptUtil.toString(relNode)}\n" +
- s"This exception indicates that the query uses an unsupported SQL feature.\n" +
- s"Please check the documentation for the set of currently supported SQL features.", e)
- }
- dataStreamPlan
- }
-
-
- /**
- * Translates a [[Table]] into a [[DataStream]].
- *
- * The transformation involves optimizing the relational expression tree as defined by
- * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
- *
- * @param table The root node of the relational expression tree.
- * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
- * @tparam A The type of the resulting [[DataStream]].
- * @return The [[DataStream]] that corresponds to the translated [[Table]].
- */
- protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
- val dataStreamPlan = optimize(table.getRelNode)
- translate(dataStreamPlan)
- }
-
- /**
- * Translates a logical [[RelNode]] into a [[DataStream]].
- *
- * @param logicalPlan The root node of the relational expression tree.
- * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
- * @tparam A The type of the resulting [[DataStream]].
- * @return The [[DataStream]] that corresponds to the translated [[Table]].
- */
- protected def translate[A]
- (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = {
-
- validateType(tpe)
-
- logicalPlan match {
- case node: DataStreamRel =>
- node.translateToPlan(
- this,
- Some(tpe.asInstanceOf[TypeInformation[Any]])
- ).asInstanceOf[DataStream[A]]
- case _ => ???
- }
- }
-
- /**
- * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
- * the result of the given [[Table]].
- *
- * @param table The table for which the AST and execution plan will be returned.
- */
- def explain(table: Table): String = {
- val ast = table.getRelNode
- val optimizedPlan = optimize(ast)
- val dataStream = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))
-
- val env = dataStream.getExecutionEnvironment
- val jsonSqlPlan = env.getExecutionPlan
-
- val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false)
-
- s"== Abstract Syntax Tree ==" +
- System.lineSeparator +
- s"${RelOptUtil.toString(ast)}" +
- System.lineSeparator +
- s"== Optimized Logical Plan ==" +
- System.lineSeparator +
- s"${RelOptUtil.toString(optimizedPlan)}" +
- System.lineSeparator +
- s"== Physical Execution Plan ==" +
- System.lineSeparator +
- s"$sqlPlan"
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
deleted file mode 100644
index 37d9cb5..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-import java.util.TimeZone
-
-/**
- * A config to define the runtime behavior of the Table API.
- */
-class TableConfig {
-
- /**
- * Defines the timezone for date/time/timestamp conversions.
- */
- private var timeZone: TimeZone = TimeZone.getTimeZone("UTC")
-
- /**
- * Defines if all fields need to be checked for NULL first.
- */
- private var nullCheck: Boolean = true
-
- /**
- * Defines if efficient types (such as Tuple types or Atomic types)
- * should be used within operators where possible.
- */
- private var efficientTypeUsage = false
-
- /**
- * Defines the configuration of Calcite for Table API and SQL queries.
- */
- private var calciteConfig = CalciteConfig.DEFAULT
-
- /**
- * Sets the timezone for date/time/timestamp conversions.
- */
- def setTimeZone(timeZone: TimeZone): Unit = {
- require(timeZone != null, "timeZone must not be null.")
- this.timeZone = timeZone
- }
-
- /**
- * Returns the timezone for date/time/timestamp conversions.
- */
- def getTimeZone = timeZone
-
- /**
- * Returns the NULL check. If enabled, all fields need to be checked for NULL first.
- */
- def getNullCheck = nullCheck
-
- /**
- * Sets the NULL check. If enabled, all fields need to be checked for NULL first.
- */
- def setNullCheck(nullCheck: Boolean): Unit = {
- this.nullCheck = nullCheck
- }
-
- /**
- * Returns the usage of efficient types. If enabled, efficient types (such as Tuple types
- * or Atomic types) are used within operators where possible.
- *
- * NOTE: Currently, this is an experimental feature.
- */
- def getEfficientTypeUsage = efficientTypeUsage
-
- /**
- * Sets the usage of efficient types. If enabled, efficient types (such as Tuple types
- * or Atomic types) are used within operators where possible.
- *
- * NOTE: Currently, this is an experimental feature.
- */
- def setEfficientTypeUsage(efficientTypeUsage: Boolean): Unit = {
- this.efficientTypeUsage = efficientTypeUsage
- }
-
- /**
- * Returns the current configuration of Calcite for Table API and SQL queries.
- */
- def getCalciteConfig: CalciteConfig = calciteConfig
-
- /**
- * Sets the configuration of Calcite for Table API and SQL queries.
- * Changing the configuration has no effect after the first query has been defined.
- */
- def setCalciteConfig(calciteConfig: CalciteConfig): Unit = {
- this.calciteConfig = calciteConfig
- }
-}
-
-object TableConfig {
- def DEFAULT = new TableConfig()
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
deleted file mode 100644
index 07ea860..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ /dev/null
@@ -1,537 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import java.lang.reflect.Modifier
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.apache.calcite.config.Lex
-import org.apache.calcite.plan.RelOptPlanner
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.schema.SchemaPlus
-import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.calcite.sql.SqlOperatorTable
-import org.apache.calcite.sql.parser.SqlParser
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable
-import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets}
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.java.table.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
-import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
-import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
-import org.apache.flink.api.table.codegen.ExpressionReducer
-import org.apache.flink.api.table.expressions.{Alias, Expression, UnresolvedFieldReference}
-import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
-import org.apache.flink.api.table.functions.{ScalarFunction, TableFunction}
-import org.apache.flink.api.table.plan.cost.DataSetCostFactory
-import org.apache.flink.api.table.plan.schema.RelTable
-import org.apache.flink.api.table.sinks.TableSink
-import org.apache.flink.api.table.validate.FunctionCatalog
-import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv}
-import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv}
-
-import scala.collection.JavaConverters._
-
-/**
- * The abstract base class for batch and stream TableEnvironments.
- *
- * @param config The configuration of the TableEnvironment
- */
-abstract class TableEnvironment(val config: TableConfig) {
-
- // the catalog to hold all registered and translated tables
- private val tables: SchemaPlus = Frameworks.createRootSchema(true)
-
- // Table API/SQL function catalog
- private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns
-
- // the configuration to create a Calcite planner
- private lazy val frameworkConfig: FrameworkConfig = Frameworks
- .newConfigBuilder
- .defaultSchema(tables)
- .parserConfig(getSqlParserConfig)
- .costFactory(new DataSetCostFactory)
- .typeSystem(new FlinkTypeSystem)
- .operatorTable(getSqlOperatorTable)
- // set the executor to evaluate constant expressions
- .executor(new ExpressionReducer(config))
- .build
-
- // the builder for Calcite RelNodes, Calcite's representation of a relational expression tree.
- protected lazy val relBuilder: FlinkRelBuilder = FlinkRelBuilder.create(frameworkConfig)
-
- // the planner instance used to optimize queries of this TableEnvironment
- private lazy val planner: RelOptPlanner = relBuilder.getPlanner
-
- private lazy val typeFactory: FlinkTypeFactory = relBuilder.getTypeFactory
-
- // a counter for unique attribute names
- private val attrNameCntr: AtomicInteger = new AtomicInteger(0)
-
- /** Returns the table config to define the runtime behavior of the Table API. */
- def getConfig = config
-
- /**
- * Returns the operator table for this environment including a custom Calcite configuration.
- */
- protected def getSqlOperatorTable: SqlOperatorTable = {
- val calciteConfig = config.getCalciteConfig
- calciteConfig.getSqlOperatorTable match {
-
- case None =>
- functionCatalog.getSqlOperatorTable
-
- case Some(table) =>
- if (calciteConfig.replacesSqlOperatorTable) {
- table
- } else {
- ChainedSqlOperatorTable.of(functionCatalog.getSqlOperatorTable, table)
- }
- }
- }
-
- /**
- * Returns the rule set for this environment including a custom Calcite configuration.
- */
- protected def getRuleSet: RuleSet = {
- val calciteConfig = config.getCalciteConfig
- calciteConfig.getRuleSet match {
-
- case None =>
- getBuiltInRuleSet
-
- case Some(ruleSet) =>
- if (calciteConfig.replacesRuleSet) {
- ruleSet
- } else {
- RuleSets.ofList((getBuiltInRuleSet.asScala ++ ruleSet.asScala).asJava)
- }
- }
- }
-
- /**
- * Returns the SQL parser config for this environment including a custom Calcite configuration.
- */
- protected def getSqlParserConfig: SqlParser.Config = {
- val calciteConfig = config.getCalciteConfig
- calciteConfig.getSqlParserConfig match {
-
- case None =>
- // we use Java lex because back ticks are easier than double quotes in programming
- // and cases are preserved
- SqlParser
- .configBuilder()
- .setLex(Lex.JAVA)
- .build()
-
- case Some(sqlParserConfig) =>
- sqlParserConfig
- }
- }
-
- /**
- * Returns the built-in rules that are defined by the environment.
- */
- protected def getBuiltInRuleSet: RuleSet
-
- /**
- * Registers a [[ScalarFunction]] under a unique name. Replaces already existing
- * user-defined functions under this name.
- */
- def registerFunction(name: String, function: ScalarFunction): Unit = {
- // check if class could be instantiated
- checkForInstantiation(function.getClass)
-
- // register in Table API
- functionCatalog.registerFunction(name, function.getClass)
-
- // register in SQL API
- functionCatalog.registerSqlFunction(createScalarSqlFunction(name, function, typeFactory))
- }
-
- /**
- * Registers a [[TableFunction]] under a unique name. Replaces already existing
- * user-defined functions under this name.
- */
- private[flink] def registerTableFunctionInternal[T: TypeInformation](
- name: String, function: TableFunction[T]): Unit = {
- // check if class not Scala object
- checkNotSingleton(function.getClass)
- // check if class could be instantiated
- checkForInstantiation(function.getClass)
-
- val typeInfo: TypeInformation[_] = if (function.getResultType != null) {
- function.getResultType
- } else {
- implicitly[TypeInformation[T]]
- }
-
- // register in Table API
- functionCatalog.registerFunction(name, function.getClass)
-
- // register in SQL API
- val sqlFunctions = createTableSqlFunctions(name, function, typeInfo, typeFactory)
- functionCatalog.registerSqlFunctions(sqlFunctions)
- }
-
- /**
- * Registers a [[Table]] under a unique name in the TableEnvironment's catalog.
- * Registered tables can be referenced in SQL queries.
- *
- * @param name The name under which the table is registered.
- * @param table The table to register.
- */
- def registerTable(name: String, table: Table): Unit = {
-
- // check that table belongs to this table environment
- if (table.tableEnv != this) {
- throw new TableException(
- "Only tables that belong to this TableEnvironment can be registered.")
- }
-
- checkValidTableName(name)
- val tableTable = new RelTable(table.getRelNode)
- registerTableInternal(name, tableTable)
- }
-
- /**
- * Replaces a registered Table with another Table under the same name.
- * We use this method to replace a [[org.apache.flink.api.table.plan.schema.DataStreamTable]]
- * with a [[org.apache.calcite.schema.TranslatableTable]].
- *
- * @param name Name of the table to replace.
- * @param table The table that replaces the previous table.
- */
- protected def replaceRegisteredTable(name: String, table: AbstractTable): Unit = {
-
- if (isRegistered(name)) {
- tables.add(name, table)
- } else {
- throw new TableException(s"Table \'$name\' is not registered.")
- }
- }
-
- /**
- * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
- *
- * All tables referenced by the query must be registered in the TableEnvironment.
- *
- * @param query The SQL query to evaluate.
- * @return The result of the query as Table.
- */
- def sql(query: String): Table
-
- /**
- * Writes a [[Table]] to a [[TableSink]].
- *
- * @param table The [[Table]] to write.
- * @param sink The [[TableSink]] to write the [[Table]] to.
- * @tparam T The data type that the [[TableSink]] expects.
- */
- private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit
-
- /**
- * Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog.
- *
- * @param name The name under which the table is registered.
- * @param table The table to register in the catalog
- * @throws TableException if another table is registered under the provided name.
- */
- @throws[TableException]
- protected def registerTableInternal(name: String, table: AbstractTable): Unit = {
-
- if (isRegistered(name)) {
- throw new TableException(s"Table \'$name\' already exists. " +
- s"Please, choose a different name.")
- } else {
- tables.add(name, table)
- }
- }
-
- /**
- * Checks if the chosen table name is valid.
- *
- * @param name The table name to check.
- */
- protected def checkValidTableName(name: String): Unit
-
- /**
- * Checks if a table is registered under the given name.
- *
- * @param name The table name to check.
- * @return true, if a table is registered under the name, false otherwise.
- */
- protected def isRegistered(name: String): Boolean = {
- tables.getTableNames.contains(name)
- }
-
- protected def getRowType(name: String): RelDataType = {
- tables.getTable(name).getRowType(typeFactory)
- }
-
- /** Returns a unique temporary attribute name. */
- private[flink] def createUniqueAttributeName(): String = {
- "TMP_" + attrNameCntr.getAndIncrement()
- }
-
- /** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */
- private[flink] def getRelBuilder: FlinkRelBuilder = {
- relBuilder
- }
-
- /** Returns the Calcite [[org.apache.calcite.plan.RelOptPlanner]] of this TableEnvironment. */
- private[flink] def getPlanner: RelOptPlanner = {
- planner
- }
-
- /** Returns the [[FlinkTypeFactory]] of this TableEnvironment. */
- private[flink] def getTypeFactory: FlinkTypeFactory = {
- typeFactory
- }
-
- private[flink] def getFunctionCatalog: FunctionCatalog = {
- functionCatalog
- }
-
- /** Returns the Calcite [[FrameworkConfig]] of this TableEnvironment. */
- private[flink] def getFrameworkConfig: FrameworkConfig = {
- frameworkConfig
- }
-
- protected def validateType(typeInfo: TypeInformation[_]): Unit = {
- val clazz = typeInfo.getTypeClass
- if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
- !Modifier.isPublic(clazz.getModifiers) ||
- clazz.getCanonicalName == null) {
- throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " +
- s"static and globally accessible.")
- }
- }
-
- /**
- * Returns field names and field positions for a given [[TypeInformation]].
- *
- * Field names are automatically extracted for
- * [[org.apache.flink.api.common.typeutils.CompositeType]].
- * The method fails if inputType is not a
- * [[org.apache.flink.api.common.typeutils.CompositeType]].
- *
- * @param inputType The TypeInformation extract the field names and positions from.
- * @tparam A The type of the TypeInformation.
- * @return A tuple of two arrays holding the field names and corresponding field positions.
- */
- protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
- (Array[String], Array[Int]) =
- {
- validateType(inputType)
-
- val fieldNames: Array[String] = inputType match {
- case t: TupleTypeInfo[A] => t.getFieldNames
- case c: CaseClassTypeInfo[A] => c.getFieldNames
- case p: PojoTypeInfo[A] => p.getFieldNames
- case r: RowTypeInfo => r.getFieldNames
- case tpe =>
- throw new TableException(s"Type $tpe lacks explicit field naming")
- }
- val fieldIndexes = fieldNames.indices.toArray
-
- if (fieldNames.contains("*")) {
- throw new TableException("Field name can not be '*'.")
- }
-
- (fieldNames, fieldIndexes)
- }
-
- /**
- * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
- * [[Expression]].
- *
- * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
- * @param exprs The expressions that define the field names.
- * @tparam A The type of the TypeInformation.
- * @return A tuple of two arrays holding the field names and corresponding field positions.
- */
- protected[flink] def getFieldInfo[A](
- inputType: TypeInformation[A],
- exprs: Array[Expression]): (Array[String], Array[Int]) = {
-
- validateType(inputType)
-
- val indexedNames: Array[(Int, String)] = inputType match {
- case a: AtomicType[A] =>
- if (exprs.length != 1) {
- throw new TableException("Table of atomic type can only have a single field.")
- }
- exprs.map {
- case UnresolvedFieldReference(name) => (0, name)
- case _ => throw new TableException("Field reference expression expected.")
- }
- case t: TupleTypeInfo[A] =>
- exprs.zipWithIndex.map {
- case (UnresolvedFieldReference(name), idx) => (idx, name)
- case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
- val idx = t.getFieldIndex(origName)
- if (idx < 0) {
- throw new TableException(s"$origName is not a field of type $t")
- }
- (idx, name)
- case _ => throw new TableException(
- "Field reference expression or alias on field expression expected.")
- }
- case c: CaseClassTypeInfo[A] =>
- exprs.zipWithIndex.map {
- case (UnresolvedFieldReference(name), idx) => (idx, name)
- case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
- val idx = c.getFieldIndex(origName)
- if (idx < 0) {
- throw new TableException(s"$origName is not a field of type $c")
- }
- (idx, name)
- case _ => throw new TableException(
- "Field reference expression or alias on field expression expected.")
- }
- case p: PojoTypeInfo[A] =>
- exprs.map {
- case (UnresolvedFieldReference(name)) =>
- val idx = p.getFieldIndex(name)
- if (idx < 0) {
- throw new TableException(s"$name is not a field of type $p")
- }
- (idx, name)
- case Alias(UnresolvedFieldReference(origName), name, _) =>
- val idx = p.getFieldIndex(origName)
- if (idx < 0) {
- throw new TableException(s"$origName is not a field of type $p")
- }
- (idx, name)
- case _ => throw new TableException(
- "Field reference expression or alias on field expression expected.")
- }
- case tpe => throw new TableException(
- s"Source of type $tpe cannot be converted into Table.")
- }
-
- val (fieldIndexes, fieldNames) = indexedNames.unzip
-
- if (fieldNames.contains("*")) {
- throw new TableException("Field name can not be '*'.")
- }
-
- (fieldNames.toArray, fieldIndexes.toArray)
- }
-
-}
-
-/**
- * Object to instantiate a [[TableEnvironment]] depending on the batch or stream execution
- * environment.
- */
-object TableEnvironment {
-
- /**
- * Returns a [[JavaBatchTableEnv]] for a Java [[JavaBatchExecEnv]].
- *
- * @param executionEnvironment The Java batch ExecutionEnvironment.
- */
- def getTableEnvironment(executionEnvironment: JavaBatchExecEnv): JavaBatchTableEnv = {
- new JavaBatchTableEnv(executionEnvironment, new TableConfig())
- }
-
- /**
- * Returns a [[JavaBatchTableEnv]] for a Java [[JavaBatchExecEnv]] and a given [[TableConfig]].
- *
- * @param executionEnvironment The Java batch ExecutionEnvironment.
- * @param tableConfig The TableConfig for the new TableEnvironment.
- */
- def getTableEnvironment(
- executionEnvironment: JavaBatchExecEnv,
- tableConfig: TableConfig): JavaBatchTableEnv = {
-
- new JavaBatchTableEnv(executionEnvironment, tableConfig)
- }
-
- /**
- * Returns a [[ScalaBatchTableEnv]] for a Scala [[ScalaBatchExecEnv]].
- *
- * @param executionEnvironment The Scala batch ExecutionEnvironment.
- */
- def getTableEnvironment(executionEnvironment: ScalaBatchExecEnv): ScalaBatchTableEnv = {
- new ScalaBatchTableEnv(executionEnvironment, new TableConfig())
- }
-
- /**
- * Returns a [[ScalaBatchTableEnv]] for a Scala [[ScalaBatchExecEnv]] and a given
- * [[TableConfig]].
- *
- * @param executionEnvironment The Scala batch ExecutionEnvironment.
- * @param tableConfig The TableConfig for the new TableEnvironment.
- */
- def getTableEnvironment(
- executionEnvironment: ScalaBatchExecEnv,
- tableConfig: TableConfig): ScalaBatchTableEnv = {
-
- new ScalaBatchTableEnv(executionEnvironment, tableConfig)
- }
-
- /**
- * Returns a [[JavaStreamTableEnv]] for a Java [[JavaStreamExecEnv]].
- *
- * @param executionEnvironment The Java StreamExecutionEnvironment.
- */
- def getTableEnvironment(executionEnvironment: JavaStreamExecEnv): JavaStreamTableEnv = {
- new JavaStreamTableEnv(executionEnvironment, new TableConfig())
- }
-
- /**
- * Returns a [[JavaStreamTableEnv]] for a Java [[JavaStreamExecEnv]] and a given [[TableConfig]].
- *
- * @param executionEnvironment The Java StreamExecutionEnvironment.
- * @param tableConfig The TableConfig for the new TableEnvironment.
- */
- def getTableEnvironment(
- executionEnvironment: JavaStreamExecEnv,
- tableConfig: TableConfig): JavaStreamTableEnv = {
-
- new JavaStreamTableEnv(executionEnvironment, tableConfig)
- }
-
- /**
- * Returns a [[ScalaStreamTableEnv]] for a Scala stream [[ScalaStreamExecEnv]].
- *
- * @param executionEnvironment The Scala StreamExecutionEnvironment.
- */
- def getTableEnvironment(executionEnvironment: ScalaStreamExecEnv): ScalaStreamTableEnv = {
- new ScalaStreamTableEnv(executionEnvironment, new TableConfig())
- }
-
- /**
- * Returns a [[ScalaStreamTableEnv]] for a Scala stream [[ScalaStreamExecEnv]].
- *
- * @param executionEnvironment The Scala StreamExecutionEnvironment.
- * @param tableConfig The TableConfig for the new TableEnvironment.
- */
- def getTableEnvironment(
- executionEnvironment: ScalaStreamExecEnv,
- tableConfig: TableConfig): ScalaStreamTableEnv = {
-
- new ScalaStreamTableEnv(executionEnvironment, tableConfig)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala
deleted file mode 100644
index a988152..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
-import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo
-
-/**
- * This class enumerates all supported types of the Table API.
- */
-object Types {
-
- val STRING = BasicTypeInfo.STRING_TYPE_INFO
- val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO
-
- val BYTE = BasicTypeInfo.BYTE_TYPE_INFO
- val SHORT = BasicTypeInfo.SHORT_TYPE_INFO
- val INT = BasicTypeInfo.INT_TYPE_INFO
- val LONG = BasicTypeInfo.LONG_TYPE_INFO
- val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO
- val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO
- val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO
-
- val DATE = SqlTimeTypeInfo.DATE
- val TIME = SqlTimeTypeInfo.TIME
- val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
- val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
- val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenException.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenException.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenException.scala
deleted file mode 100644
index 8b7559f..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenException.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen
-
-/**
- * Exception for all errors occurring during code generation.
- */
-class CodeGenException(msg: String) extends RuntimeException(msg)