You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:09 UTC

[40/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
deleted file mode 100644
index 6d00ab3..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.sql2rel.RelDecorrelator
-import org.apache.calcite.tools.{Programs, RuleSet}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.io.DiscardingOutputFormat
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.api.table.explain.PlanJsonParser
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel}
-import org.apache.flink.api.table.plan.rules.FlinkRuleSets
-import org.apache.flink.api.table.plan.schema.{DataSetTable, TableSourceTable}
-import org.apache.flink.api.table.sinks.{BatchTableSink, TableSink}
-import org.apache.flink.api.table.sources.BatchTableSource
-import org.apache.flink.types.Row
-
-/**
-  * The abstract base class for batch TableEnvironments.
-  *
-  * A TableEnvironment can be used to:
-  * - convert a [[DataSet]] to a [[Table]]
-  * - register a [[DataSet]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
-  * - register a [[Table]] in the [[org.apache.flink.api.table.TableEnvironment]]'s catalog
-  * - scan a registered table to obtain a [[Table]]
-  * - specify a SQL query on registered tables to obtain a [[Table]]
-  * - convert a [[Table]] into a [[DataSet]]
-  * - explain the AST and execution plan of a [[Table]]
-  *
-  * @param execEnv The [[ExecutionEnvironment]] which is wrapped in this [[BatchTableEnvironment]].
-  * @param config The [[TableConfig]] of this [[BatchTableEnvironment]].
-  */
-abstract class BatchTableEnvironment(
-    private[flink] val execEnv: ExecutionEnvironment,
-    config: TableConfig)
-  extends TableEnvironment(config) {
-
-  // a counter for unique table names.
-  private val nameCntr: AtomicInteger = new AtomicInteger(0)
-
-  // the naming pattern for internally registered tables.
-  private val internalNamePattern = "^_DataSetTable_[0-9]+$".r
-
-  /**
-    * Checks if the chosen table name is valid.
-    *
-    * @param name The table name to check.
-    */
-  override protected def checkValidTableName(name: String): Unit = {
-    val m = internalNamePattern.findFirstIn(name)
-    m match {
-      case Some(_) =>
-        throw new TableException(s"Illegal Table name. " +
-          s"Please choose a name that does not contain the pattern $internalNamePattern")
-      case None =>
-    }
-  }
-
-  /** Returns a unique table name according to the internal naming pattern. */
-  protected def createUniqueTableName(): String = "_DataSetTable_" + nameCntr.getAndIncrement()
-
-  /**
-    * Scans a registered table and returns the resulting [[Table]].
-    *
-    * The table to scan must be registered in the [[TableEnvironment]]'s catalog.
-    *
-    * @param tableName The name of the table to scan.
-    * @throws ValidationException if no table is registered under the given name.
-    * @return The scanned table.
-    */
-  @throws[ValidationException]
-  def scan(tableName: String): Table = {
-    if (isRegistered(tableName)) {
-      new Table(this, CatalogNode(tableName, getRowType(tableName)))
-    } else {
-      throw new TableException(s"Table \'$tableName\' was not found in the registry.")
-    }
-  }
-
-  /**
-    * Registers an external [[BatchTableSource]] in this [[TableEnvironment]]'s catalog.
-    * Registered tables can be referenced in SQL queries.
-    *
-    * @param name The name under which the [[BatchTableSource]] is registered.
-    * @param tableSource The [[BatchTableSource]] to register.
-    */
-  def registerTableSource(name: String, tableSource: BatchTableSource[_]): Unit = {
-
-    checkValidTableName(name)
-    registerTableInternal(name, new TableSourceTable(tableSource))
-  }
-
-  /**
-    * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
-    *
-    * All tables referenced by the query must be registered in the TableEnvironment.
-    *
-    * @param query The SQL query to evaluate.
-    * @return The result of the query as Table.
-    */
-  override def sql(query: String): Table = {
-
-    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
-    // parse the sql query
-    val parsed = planner.parse(query)
-    // validate the sql query
-    val validated = planner.validate(parsed)
-    // transform to a relational tree
-    val relational = planner.rel(validated)
-
-    new Table(this, LogicalRelNode(relational.rel))
-  }
-
-  /**
-    * Writes a [[Table]] to a [[TableSink]].
-    *
-    * Internally, the [[Table]] is translated into a [[DataSet]] and handed over to the
-    * [[TableSink]] to write it.
-    *
-    * @param table The [[Table]] to write.
-    * @param sink The [[TableSink]] to write the [[Table]] to.
-    * @tparam T The expected type of the [[DataSet]] which represents the [[Table]].
-    */
-  override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = {
-
-    sink match {
-      case batchSink: BatchTableSink[T] =>
-        val outputType = sink.getOutputType
-        // translate the Table into a DataSet and provide the type that the TableSink expects.
-        val result: DataSet[T] = translate(table)(outputType)
-        // Give the DataSet to the TableSink to emit it.
-        batchSink.emitDataSet(result)
-      case _ =>
-        throw new TableException("BatchTableSink required to emit batch Table")
-    }
-  }
-
-  /**
-    * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
-    * the result of the given [[Table]].
-    *
-    * @param table The table for which the AST and execution plan will be returned.
-    * @param extended Flag to include detailed optimizer estimates.
-    */
-  private[flink] def explain(table: Table, extended: Boolean): String = {
-    val ast = table.getRelNode
-    val optimizedPlan = optimize(ast)
-    val dataSet = translate[Row](optimizedPlan) (TypeExtractor.createTypeInfo(classOf[Row]))
-    dataSet.output(new DiscardingOutputFormat[Row])
-    val env = dataSet.getExecutionEnvironment
-    val jasonSqlPlan = env.getExecutionPlan
-    val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
-
-    s"== Abstract Syntax Tree ==" +
-        System.lineSeparator +
-        s"${RelOptUtil.toString(ast)}" +
-        System.lineSeparator +
-        s"== Optimized Logical Plan ==" +
-        System.lineSeparator +
-        s"${RelOptUtil.toString(optimizedPlan)}" +
-        System.lineSeparator +
-        s"== Physical Execution Plan ==" +
-        System.lineSeparator +
-        s"$sqlPlan"
-  }
-
-  /**
-    * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
-    * the result of the given [[Table]].
-    *
-    * @param table The table for which the AST and execution plan will be returned.
-    */
-  def explain(table: Table): String = explain(table: Table, extended = false)
-
-  /**
-    * Registers a [[DataSet]] as a table under a given name in the [[TableEnvironment]]'s catalog.
-    *
-    * @param name The name under which the table is registered in the catalog.
-    * @param dataSet The [[DataSet]] to register as table in the catalog.
-    * @tparam T the type of the [[DataSet]].
-    */
-  protected def registerDataSetInternal[T](name: String, dataSet: DataSet[T]): Unit = {
-
-    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType)
-    val dataSetTable = new DataSetTable[T](
-      dataSet,
-      fieldIndexes,
-      fieldNames
-    )
-    registerTableInternal(name, dataSetTable)
-  }
-
-  /**
-    * Registers a [[DataSet]] as a table under a given name with field names as specified by
-    * field expressions in the [[TableEnvironment]]'s catalog.
-    *
-    * @param name The name under which the table is registered in the catalog.
-    * @param dataSet The [[DataSet]] to register as table in the catalog.
-    * @param fields The field expressions to define the field names of the table.
-    * @tparam T The type of the [[DataSet]].
-    */
-  protected def registerDataSetInternal[T](
-      name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = {
-
-    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType, fields)
-    val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames)
-    registerTableInternal(name, dataSetTable)
-  }
-
-  /**
-    * Returns the built-in rules that are defined by the environment.
-    */
-  protected def getBuiltInRuleSet: RuleSet = FlinkRuleSets.DATASET_OPT_RULES
-
-  /**
-    * Generates the optimized [[RelNode]] tree from the original relational node tree.
-    *
-    * @param relNode The original [[RelNode]] tree
-    * @return The optimized [[RelNode]] tree
-    */
-  private[flink] def optimize(relNode: RelNode): RelNode = {
-
-    // decorrelate
-    val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
-
-    // optimize the logical Flink plan
-    val optProgram = Programs.ofRules(getRuleSet)
-    val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
-
-    val dataSetPlan = try {
-      optProgram.run(getPlanner, decorPlan, flinkOutputProps)
-    } catch {
-      case e: CannotPlanException =>
-        throw new TableException(
-          s"Cannot generate a valid execution plan for the given query: \n\n" +
-            s"${RelOptUtil.toString(relNode)}\n" +
-            s"This exception indicates that the query uses an unsupported SQL feature.\n" +
-            s"Please check the documentation for the set of currently supported SQL features.")
-      case t: TableException =>
-        throw new TableException(
-          s"Cannot generate a valid execution plan for the given query: \n\n" +
-            s"${RelOptUtil.toString(relNode)}\n" +
-            s"${t.msg}\n" +
-            s"Please check the documentation for the set of currently supported SQL features.")
-      case a: AssertionError =>
-        throw a.getCause
-    }
-    dataSetPlan
-  }
-
-  /**
-    * Translates a [[Table]] into a [[DataSet]].
-    *
-    * The transformation involves optimizing the relational expression tree as defined by
-    * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
-    *
-    * @param table The root node of the relational expression tree.
-    * @param tpe   The [[TypeInformation]] of the resulting [[DataSet]].
-    * @tparam A The type of the resulting [[DataSet]].
-    * @return The [[DataSet]] that corresponds to the translated [[Table]].
-    */
-  protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
-    val dataSetPlan = optimize(table.getRelNode)
-    translate(dataSetPlan)
-  }
-
-  /**
-    * Translates a logical [[RelNode]] into a [[DataSet]].
-    *
-    * @param logicalPlan The root node of the relational expression tree.
-    * @param tpe         The [[TypeInformation]] of the resulting [[DataSet]].
-    * @tparam A The type of the resulting [[DataSet]].
-    * @return The [[DataSet]] that corresponds to the translated [[Table]].
-    */
-  protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = {
-    validateType(tpe)
-
-    logicalPlan match {
-      case node: DataSetRel =>
-        node.translateToPlan(
-          this,
-          Some(tpe.asInstanceOf[TypeInformation[Any]])
-        ).asInstanceOf[DataSet[A]]
-      case _ => ???
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala
deleted file mode 100644
index 06b3edc..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/CalciteConfig.scala
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import org.apache.calcite.plan.RelOptRule
-import org.apache.calcite.sql.SqlOperatorTable
-import org.apache.calcite.sql.parser.SqlParser
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable
-import org.apache.calcite.tools.{RuleSets, RuleSet}
-import org.apache.flink.util.Preconditions
-
-import scala.collection.JavaConverters._
-
-/**
-  * Builder for creating a Calcite configuration.
-  */
-class CalciteConfigBuilder {
-  private var replaceRules: Boolean = false
-  private var ruleSets: List[RuleSet] = Nil
-
-  private var replaceOperatorTable: Boolean = false
-  private var operatorTables: List[SqlOperatorTable] = Nil
-
-  private var replaceSqlParserConfig: Option[SqlParser.Config] = None
-
-  /**
-    * Replaces the built-in rule set with the given rule set.
-    */
-  def replaceRuleSet(replaceRuleSet: RuleSet): CalciteConfigBuilder = {
-    Preconditions.checkNotNull(replaceRuleSet)
-    ruleSets = List(replaceRuleSet)
-    replaceRules = true
-    this
-  }
-
-  /**
-    * Appends the given rule set to the built-in rule set.
-    */
-  def addRuleSet(addedRuleSet: RuleSet): CalciteConfigBuilder = {
-    Preconditions.checkNotNull(addedRuleSet)
-    ruleSets = addedRuleSet :: ruleSets
-    this
-  }
-
-  /**
-    * Replaces the built-in SQL operator table with the given table.
-    */
-  def replaceSqlOperatorTable(replaceSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = {
-    Preconditions.checkNotNull(replaceSqlOperatorTable)
-    operatorTables = List(replaceSqlOperatorTable)
-    replaceOperatorTable = true
-    this
-  }
-
-  /**
-    * Appends the given table to the built-in SQL operator table.
-    */
-  def addSqlOperatorTable(addedSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = {
-    Preconditions.checkNotNull(addedSqlOperatorTable)
-    this.operatorTables = addedSqlOperatorTable :: this.operatorTables
-    this
-  }
-
-  /**
-    * Replaces the built-in SQL parser configuration with the given configuration.
-    */
-  def replaceSqlParserConfig(sqlParserConfig: SqlParser.Config): CalciteConfigBuilder = {
-    Preconditions.checkNotNull(sqlParserConfig)
-    replaceSqlParserConfig = Some(sqlParserConfig)
-    this
-  }
-
-  private class CalciteConfigImpl(
-      val getRuleSet: Option[RuleSet],
-      val replacesRuleSet: Boolean,
-      val getSqlOperatorTable: Option[SqlOperatorTable],
-      val replacesSqlOperatorTable: Boolean,
-      val getSqlParserConfig: Option[SqlParser.Config])
-    extends CalciteConfig
-
-  /**
-    * Builds a new [[CalciteConfig]].
-    */
-  def build(): CalciteConfig = new CalciteConfigImpl(
-        ruleSets match {
-      case Nil => None
-      case h :: Nil => Some(h)
-      case _ =>
-        // concat rule sets
-        val concatRules = ruleSets.foldLeft(Nil: Iterable[RelOptRule])( (c, r) => r.asScala ++ c)
-        Some(RuleSets.ofList(concatRules.asJava))
-    },
-    this.replaceRules,
-    operatorTables match {
-      case Nil => None
-      case h :: Nil => Some(h)
-      case _ =>
-        // chain operator tables
-        Some(operatorTables.reduce( (x, y) => ChainedSqlOperatorTable.of(x, y)))
-    },
-    this.replaceOperatorTable,
-    replaceSqlParserConfig)
-}
-
-/**
-  * Calcite configuration for defining a custom Calcite configuration for Table and SQL API.
-  */
-trait CalciteConfig {
-  /**
-    * Returns whether this configuration replaces the built-in rule set.
-    */
-  def replacesRuleSet: Boolean
-
-  /**
-    * Returns a custom rule set.
-    */
-  def getRuleSet: Option[RuleSet]
-
-  /**
-    * Returns whether this configuration replaces the built-in SQL operator table.
-    */
-  def replacesSqlOperatorTable: Boolean
-
-  /**
-    * Returns a custom SQL operator table.
-    */
-  def getSqlOperatorTable: Option[SqlOperatorTable]
-
-  /**
-    * Returns a custom SQL parser configuration.
-    */
-  def getSqlParserConfig: Option[SqlParser.Config]
-}
-
-object CalciteConfig {
-
-  val DEFAULT = createBuilder().build()
-
-  /**
-    * Creates a new builder for constructing a [[CalciteConfig]].
-    */
-  def createBuilder(): CalciteConfigBuilder = {
-    new CalciteConfigBuilder
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala
deleted file mode 100644
index b1ccc09..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import org.apache.calcite.adapter.java.JavaTypeFactory
-import org.apache.calcite.prepare.CalciteCatalogReader
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.sql.{SqlInsert, SqlOperatorTable}
-import org.apache.calcite.sql.validate.{SqlValidatorImpl, SqlConformance}
-
-/**
- * This is a copy of Calcite's CalciteSqlValidator to use with [[FlinkPlannerImpl]].
- */
-class FlinkCalciteSqlValidator(
-    opTab: SqlOperatorTable,
-    catalogReader: CalciteCatalogReader,
-    typeFactory: JavaTypeFactory) extends SqlValidatorImpl(
-        opTab, catalogReader, typeFactory, SqlConformance.DEFAULT) {
-
-  override def getLogicalSourceRowType(
-      sourceRowType: RelDataType,
-      insert: SqlInsert): RelDataType = {
-    typeFactory.asInstanceOf[JavaTypeFactory].toSql(sourceRowType)
-  }
-
-  override def getLogicalTargetRowType(
-      targetRowType: RelDataType,
-      insert: SqlInsert): RelDataType = {
-    typeFactory.asInstanceOf[JavaTypeFactory].toSql(targetRowType)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
deleted file mode 100644
index 131cdc6..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import java.util
-
-import com.google.common.collect.ImmutableList
-import org.apache.calcite.jdbc.CalciteSchema
-import org.apache.calcite.plan.RelOptTable.ViewExpander
-import org.apache.calcite.plan._
-import org.apache.calcite.prepare.CalciteCatalogReader
-import org.apache.calcite.rel.RelRoot
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.RexBuilder
-import org.apache.calcite.schema.SchemaPlus
-import org.apache.calcite.sql.parser.{SqlParseException => CSqlParseException, SqlParser}
-import org.apache.calcite.sql.validate.SqlValidator
-import org.apache.calcite.sql.{SqlNode, SqlOperatorTable}
-import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, SqlToRelConverter}
-import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
-
-import scala.collection.JavaConversions._
-
-/**
-  * NOTE: this is heavily inspired by Calcite's PlannerImpl.
-  * We need it in order to share the planner between the Table API relational plans
-  * and the SQL relation plans that are created by the Calcite parser.
-  * The main difference is that we do not create a new RelOptPlanner in the ready() method.
-  */
-class FlinkPlannerImpl(
-    config: FrameworkConfig,
-    planner: RelOptPlanner,
-    typeFactory: FlinkTypeFactory) {
-
-  val operatorTable: SqlOperatorTable = config.getOperatorTable
-  /** Holds the trait definitions to be registered with planner. May be null. */
-  val traitDefs: ImmutableList[RelTraitDef[_ <: RelTrait]] = config.getTraitDefs
-  val parserConfig: SqlParser.Config = config.getParserConfig
-  val convertletTable: SqlRexConvertletTable = config.getConvertletTable
-  val defaultSchema: SchemaPlus = config.getDefaultSchema
-
-  var validator: FlinkCalciteSqlValidator = _
-  var validatedSqlNode: SqlNode = _
-  var root: RelRoot = _
-
-  private def ready() {
-    if (this.traitDefs != null) {
-      planner.clearRelTraitDefs()
-      for (traitDef <- this.traitDefs) {
-        planner.addRelTraitDef(traitDef)
-      }
-    }
-  }
-
-  def parse(sql: String): SqlNode = {
-    try {
-      ready()
-      val parser: SqlParser = SqlParser.create(sql, parserConfig)
-      val sqlNode: SqlNode = parser.parseStmt
-      sqlNode
-    } catch {
-      case e: CSqlParseException =>
-        throw SqlParserException(s"SQL parse failed. ${e.getMessage}", e)
-    }
-  }
-
-  def validate(sqlNode: SqlNode): SqlNode = {
-    validator = new FlinkCalciteSqlValidator(operatorTable, createCatalogReader, typeFactory)
-    validator.setIdentifierExpansion(true)
-    try {
-      validatedSqlNode = validator.validate(sqlNode)
-    }
-    catch {
-      case e: RuntimeException =>
-        throw new ValidationException(s"SQL validation failed. ${e.getMessage}", e)
-    }
-    validatedSqlNode
-  }
-
-  def rel(sql: SqlNode): RelRoot = {
-    try {
-      assert(validatedSqlNode != null)
-      val rexBuilder: RexBuilder = createRexBuilder
-      val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
-      val config = SqlToRelConverter.configBuilder()
-        .withTrimUnusedFields(false).withConvertTableAccess(false).build()
-      val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
-        new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable, config)
-      root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true)
-      // we disable automatic flattening in order to let composite types pass without modification
-      // we might enable it again once Calcite has better support for structured types
-      // root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
-      root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
-      root
-    } catch {
-      case e: RelConversionException => throw TableException(e.getMessage)
-    }
-  }
-
-  /** Implements [[org.apache.calcite.plan.RelOptTable.ViewExpander]]
-    * interface for [[org.apache.calcite.tools.Planner]]. */
-  class ViewExpanderImpl extends ViewExpander {
-
-    override def expandView(
-        rowType: RelDataType,
-        queryString: String,
-        schemaPath: util.List[String],
-        viewPath: util.List[String]): RelRoot = {
-
-      val parser: SqlParser = SqlParser.create(queryString, parserConfig)
-      var sqlNode: SqlNode = null
-      try {
-        sqlNode = parser.parseQuery
-      }
-      catch {
-        case e: CSqlParseException =>
-          throw SqlParserException(s"SQL parse failed. ${e.getMessage}", e)
-      }
-      val catalogReader: CalciteCatalogReader = createCatalogReader.withSchemaPath(schemaPath)
-      val validator: SqlValidator =
-        new FlinkCalciteSqlValidator(operatorTable, catalogReader, typeFactory)
-      validator.setIdentifierExpansion(true)
-      val validatedSqlNode: SqlNode = validator.validate(sqlNode)
-      val rexBuilder: RexBuilder = createRexBuilder
-      val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
-      val config: SqlToRelConverter.Config = SqlToRelConverter.configBuilder
-        .withTrimUnusedFields(false).withConvertTableAccess(false).build
-      val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
-        new ViewExpanderImpl, validator, catalogReader, cluster, convertletTable, config)
-      root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false)
-      root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
-      root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
-      FlinkPlannerImpl.this.root
-    }
-  }
-
-  private def createCatalogReader: CalciteCatalogReader = {
-    val rootSchema: SchemaPlus = FlinkPlannerImpl.rootSchema(defaultSchema)
-    new CalciteCatalogReader(
-      CalciteSchema.from(rootSchema),
-      parserConfig.caseSensitive,
-      CalciteSchema.from(defaultSchema).path(null),
-      typeFactory)
-  }
-
-  private def createRexBuilder: RexBuilder = {
-    new RexBuilder(typeFactory)
-  }
-
-}
-
-object FlinkPlannerImpl {
-  private def rootSchema(schema: SchemaPlus): SchemaPlus = {
-    if (schema.getParentSchema == null) {
-      schema
-    }
-    else {
-      rootSchema(schema.getParentSchema)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
deleted file mode 100644
index 8508e53..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import java.util.Collections
-
-import org.apache.calcite.plan.volcano.VolcanoPlanner
-import java.lang.Iterable
-
-import org.apache.calcite.jdbc.CalciteSchema
-import org.apache.calcite.plan._
-import org.apache.calcite.prepare.CalciteCatalogReader
-import org.apache.calcite.rel.logical.LogicalAggregate
-import org.apache.calcite.rex.RexBuilder
-import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey}
-import org.apache.calcite.tools.{FrameworkConfig, RelBuilder}
-import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.api.table.expressions.WindowProperty
-import org.apache.flink.api.table.plan.logical.LogicalWindow
-import org.apache.flink.api.table.plan.logical.rel.LogicalWindowAggregate
-
-/**
-  * Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]].
-  */
-class FlinkRelBuilder(
-    context: Context,
-    relOptCluster: RelOptCluster,
-    relOptSchema: RelOptSchema)
-  extends RelBuilder(
-    context,
-    relOptCluster,
-    relOptSchema) {
-
-  def getPlanner: RelOptPlanner = cluster.getPlanner
-
-  def getCluster: RelOptCluster = relOptCluster
-
-  override def getTypeFactory: FlinkTypeFactory =
-    super.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-
-  def aggregate(
-      window: LogicalWindow,
-      groupKey: GroupKey,
-      namedProperties: Seq[NamedWindowProperty],
-      aggCalls: Iterable[AggCall])
-    : RelBuilder = {
-    // build logical aggregate
-    val aggregate = super.aggregate(groupKey, aggCalls).build().asInstanceOf[LogicalAggregate]
-
-    // build logical window aggregate from it
-    push(LogicalWindowAggregate.create(window, namedProperties, aggregate))
-    this
-  }
-
-}
-
-object FlinkRelBuilder {
-
-  def create(config: FrameworkConfig): FlinkRelBuilder = {
-
-    // create Flink type factory
-    val typeSystem = config.getTypeSystem
-    val typeFactory = new FlinkTypeFactory(typeSystem)
-
-    // create context instances with Flink type factory
-    val planner = new VolcanoPlanner(config.getCostFactory, Contexts.empty())
-    planner.setExecutor(config.getExecutor)
-    planner.addRelTraitDef(ConventionTraitDef.INSTANCE)
-    val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory))
-    val calciteSchema = CalciteSchema.from(config.getDefaultSchema)
-    val relOptSchema = new CalciteCatalogReader(
-      calciteSchema,
-      config.getParserConfig.caseSensitive(),
-      Collections.emptyList(),
-      typeFactory)
-
-    new FlinkRelBuilder(config.getContext, cluster, relOptSchema)
-  }
-
-  /**
-    * Information necessary to create a window aggregate.
-    *
-    * Similar to [[RelBuilder.AggCall]] or [[RelBuilder.GroupKey]].
-    */
-  case class NamedWindowProperty(name: String, property: WindowProperty)
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
deleted file mode 100644
index 8dcd660..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import org.apache.calcite.avatica.util.TimeUnit
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
-import org.apache.calcite.sql.SqlIntervalQualifier
-import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.parser.SqlParserPos
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
-import org.apache.flink.api.java.typeutils.ValueTypeInfo._
-import org.apache.flink.api.table.FlinkTypeFactory.typeInfoToSqlTypeName
-import org.apache.flink.api.table.plan.schema.{ArrayRelDataType, CompositeRelDataType, GenericRelDataType}
-import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo
-import org.apache.flink.api.table.typeutils.TypeCheckUtils.isSimple
-
-import scala.collection.mutable
-
-/**
-  * Flink specific type factory that represents the interface between Flink's [[TypeInformation]]
-  * and Calcite's [[RelDataType]].
-  */
-class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) {
-
-  // NOTE: for future data types it might be necessary to
-  // override more methods of RelDataTypeFactoryImpl
-
-  private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]()
-
-  def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = {
-    // simple type can be converted to SQL types and vice versa
-    if (isSimple(typeInfo)) {
-      val sqlType = typeInfoToSqlTypeName(typeInfo)
-      sqlType match {
-
-        case INTERVAL_YEAR_MONTH =>
-          createSqlIntervalType(
-            new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
-
-        case INTERVAL_DAY_SECOND =>
-          createSqlIntervalType(
-            new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
-
-        case _ =>
-          createSqlType(sqlType)
-      }
-    }
-    // advanced types require specific RelDataType
-    // for storing the original TypeInformation
-    else {
-      seenTypes.getOrElseUpdate(typeInfo, canonize(createAdvancedType(typeInfo)))
-    }
-  }
-
-  /**
-    * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory
-    *
-    * @param fieldNames field names
-    * @param fieldTypes field types, every element is Flink's [[TypeInformation]]
-    * @return a struct type with the input fieldNames and input fieldTypes
-    */
-  def buildRowDataType(
-      fieldNames: Array[String],
-      fieldTypes: Array[TypeInformation[_]])
-    : RelDataType = {
-    val rowDataTypeBuilder = builder
-    fieldNames
-      .zip(fieldTypes)
-      .foreach { f =>
-        rowDataTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2)).nullable(true)
-      }
-    rowDataTypeBuilder.build
-  }
-
-  override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = {
-    // it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue
-    // always set those to default value
-    if (typeName == VARCHAR && precision < 0) {
-      createSqlType(typeName, getTypeSystem.getDefaultPrecision(typeName))
-    } else {
-      super.createSqlType(typeName, precision)
-    }
-  }
-
-  override def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType =
-    new ArrayRelDataType(
-      ObjectArrayTypeInfo.getInfoFor(FlinkTypeFactory.toTypeInfo(elementType)),
-      elementType,
-      true)
-
-  private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match {
-    case ct: CompositeType[_] =>
-      new CompositeRelDataType(ct, this)
-
-    case pa: PrimitiveArrayTypeInfo[_] =>
-      new ArrayRelDataType(pa, createTypeFromTypeInfo(pa.getComponentType), false)
-
-    case oa: ObjectArrayTypeInfo[_, _] =>
-      new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true)
-
-    case ti: TypeInformation[_] =>
-      new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem])
-
-    case ti@_ =>
-      throw TableException(s"Unsupported type information: $ti")
-  }
-
-  override def createTypeWithNullability(
-    relDataType: RelDataType,
-    nullable: Boolean)
-  : RelDataType = relDataType match {
-    case composite: CompositeRelDataType =>
-      // at the moment we do not care about nullability
-      composite
-    case _ =>
-      super.createTypeWithNullability(relDataType, nullable)
-  }
-}
-
-object FlinkTypeFactory {
-
-  private def typeInfoToSqlTypeName(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match {
-      case BOOLEAN_TYPE_INFO => BOOLEAN
-      case BYTE_TYPE_INFO => TINYINT
-      case SHORT_TYPE_INFO => SMALLINT
-      case INT_TYPE_INFO => INTEGER
-      case LONG_TYPE_INFO => BIGINT
-      case FLOAT_TYPE_INFO => FLOAT
-      case DOUBLE_TYPE_INFO => DOUBLE
-      case STRING_TYPE_INFO => VARCHAR
-      case BIG_DEC_TYPE_INFO => DECIMAL
-
-      // temporal types
-      case SqlTimeTypeInfo.DATE => DATE
-      case SqlTimeTypeInfo.TIME => TIME
-      case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP
-      case TimeIntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH
-      case TimeIntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_SECOND
-
-      case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO =>
-        throw TableException("Character type is not supported.")
-
-      case _@t =>
-        throw TableException(s"Type is not supported: $t")
-  }
-
-  def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match {
-    case BOOLEAN => BOOLEAN_TYPE_INFO
-    case TINYINT => BYTE_TYPE_INFO
-    case SMALLINT => SHORT_TYPE_INFO
-    case INTEGER => INT_TYPE_INFO
-    case BIGINT => LONG_TYPE_INFO
-    case FLOAT => FLOAT_TYPE_INFO
-    case DOUBLE => DOUBLE_TYPE_INFO
-    case VARCHAR | CHAR => STRING_TYPE_INFO
-    case DECIMAL => BIG_DEC_TYPE_INFO
-
-    // temporal types
-    case DATE => SqlTimeTypeInfo.DATE
-    case TIME => SqlTimeTypeInfo.TIME
-    case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP
-    case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => TimeIntervalTypeInfo.INTERVAL_MONTHS
-    case typeName if DAY_INTERVAL_TYPES.contains(typeName) => TimeIntervalTypeInfo.INTERVAL_MILLIS
-
-    case NULL =>
-      throw TableException("Type NULL is not supported. Null values must have a supported type.")
-
-    // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
-    // are represented as integer
-    case SYMBOL => INT_TYPE_INFO
-
-    // extract encapsulated TypeInformation
-    case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
-      val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
-      genericRelDataType.typeInfo
-
-    case ROW if relDataType.isInstanceOf[CompositeRelDataType] =>
-      val compositeRelDataType = relDataType.asInstanceOf[CompositeRelDataType]
-      compositeRelDataType.compositeType
-
-    // ROW and CURSOR for UDTF case, whose type info will never be used, just a placeholder
-    case ROW | CURSOR => new NothingTypeInfo
-
-    case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
-      val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
-      arrayRelDataType.typeInfo
-
-    case _@t =>
-      throw TableException(s"Type is not supported: $t")
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
deleted file mode 100644
index 3222eee..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import org.apache.calcite.rel.`type`.RelDataTypeSystemImpl
-import org.apache.calcite.sql.`type`.SqlTypeName
-
-/**
-  * Custom type system for Flink.
-  */
-class FlinkTypeSystem extends RelDataTypeSystemImpl {
-
-  // we cannot use Int.MaxValue because of an overflow in Calcites type inference logic
-  // half should be enough for all use cases
-  override def getMaxNumericScale: Int = Int.MaxValue / 2
-
-  // we cannot use Int.MaxValue because of an overflow in Calcites type inference logic
-  // half should be enough for all use cases
-  override def getMaxNumericPrecision: Int = Int.MaxValue / 2
-
-  override def getDefaultPrecision(typeName: SqlTypeName): Int = typeName match {
-
-    // by default all VARCHARs can have the Java default length
-    case SqlTypeName.VARCHAR =>
-      Int.MaxValue
-
-    // we currenty support only timestamps with milliseconds precision
-    case SqlTypeName.TIMESTAMP =>
-      3
-
-    case _ =>
-      super.getDefaultPrecision(typeName)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
deleted file mode 100644
index da20e07..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.sql2rel.RelDecorrelator
-import org.apache.calcite.tools.{Programs, RuleSet}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.table.explain.PlanJsonParser
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
-import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
-import org.apache.flink.api.table.plan.rules.FlinkRuleSets
-import org.apache.flink.api.table.plan.schema.{DataStreamTable, TableSourceTable}
-import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink}
-import org.apache.flink.api.table.sources.StreamTableSource
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.types.Row
-
-/**
-  * The base class for stream TableEnvironments.
-  *
-  * A TableEnvironment can be used to:
-  * - convert [[DataStream]] to a [[Table]]
-  * - register a [[DataStream]] as a table in the catalog
-  * - register a [[Table]] in the catalog
-  * - scan a registered table to obtain a [[Table]]
-  * - specify a SQL query on registered tables to obtain a [[Table]]
-  * - convert a [[Table]] into a [[DataStream]]
-  *
-  * @param execEnv The [[StreamExecutionEnvironment]] which is wrapped in this
-  *                [[StreamTableEnvironment]].
-  * @param config The [[TableConfig]] of this [[StreamTableEnvironment]].
-  */
-abstract class StreamTableEnvironment(
-    private[flink] val execEnv: StreamExecutionEnvironment,
-    config: TableConfig)
-  extends TableEnvironment(config) {
-
-  // a counter for unique table names
-  private val nameCntr: AtomicInteger = new AtomicInteger(0)
-
-  // the naming pattern for internally registered tables.
-  private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r
-
-  /**
-    * Checks if the chosen table name is valid.
-    *
-    * @param name The table name to check.
-    */
-  override protected def checkValidTableName(name: String): Unit = {
-    val m = internalNamePattern.findFirstIn(name)
-    m match {
-      case Some(_) =>
-        throw new TableException(s"Illegal Table name. " +
-          s"Please choose a name that does not contain the pattern $internalNamePattern")
-      case None =>
-    }
-  }
-
-  /** Returns a unique table name according to the internal naming pattern. */
-  protected def createUniqueTableName(): String = "_DataStreamTable_" + nameCntr.getAndIncrement()
-
-  /**
-    * Returns field names and field positions for a given [[TypeInformation]].
-    *
-    * Field names are automatically extracted for
-    * [[org.apache.flink.api.common.typeutils.CompositeType]].
-    * The method fails if inputType is not a
-    * [[org.apache.flink.api.common.typeutils.CompositeType]].
-    *
-    * @param inputType The TypeInformation extract the field names and positions from.
-    * @tparam A The type of the TypeInformation.
-    * @return A tuple of two arrays holding the field names and corresponding field positions.
-    */
-  override protected[flink] def getFieldInfo[A](inputType: TypeInformation[A])
-    : (Array[String], Array[Int]) = {
-    val fieldInfo = super.getFieldInfo(inputType)
-    if (fieldInfo._1.contains("rowtime")) {
-      throw new TableException("'rowtime' ia a reserved field name in stream environment.")
-    }
-    fieldInfo
-  }
-
-  /**
-    * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
-    * [[Expression]].
-    *
-    * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
-    * @param exprs     The expressions that define the field names.
-    * @tparam A The type of the TypeInformation.
-    * @return A tuple of two arrays holding the field names and corresponding field positions.
-    */
-  override protected[flink] def getFieldInfo[A](
-      inputType: TypeInformation[A],
-      exprs: Array[Expression])
-    : (Array[String], Array[Int]) = {
-    val fieldInfo = super.getFieldInfo(inputType, exprs)
-    if (fieldInfo._1.contains("rowtime")) {
-      throw new TableException("'rowtime' is a reserved field name in stream environment.")
-    }
-    fieldInfo
-  }
-
-  /**
-    * Ingests a registered table and returns the resulting [[Table]].
-    *
-    * The table to ingest must be registered in the [[TableEnvironment]]'s catalog.
-    *
-    * @param tableName The name of the table to ingest.
-    * @throws ValidationException if no table is registered under the given name.
-    * @return The ingested table.
-    */
-  @throws[ValidationException]
-  def ingest(tableName: String): Table = {
-
-    if (isRegistered(tableName)) {
-      new Table(this, CatalogNode(tableName, getRowType(tableName)))
-    }
-    else {
-      throw new ValidationException(s"Table \'$tableName\' was not found in the registry.")
-    }
-  }
-
-  /**
-    * Registers an external [[StreamTableSource]] in this [[TableEnvironment]]'s catalog.
-    * Registered tables can be referenced in SQL queries.
-    *
-    * @param name        The name under which the [[StreamTableSource]] is registered.
-    * @param tableSource The [[org.apache.flink.api.table.sources.StreamTableSource]] to register.
-    */
-  def registerTableSource(name: String, tableSource: StreamTableSource[_]): Unit = {
-
-    checkValidTableName(name)
-    registerTableInternal(name, new TableSourceTable(tableSource))
-  }
-
-  /**
-    * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
-    *
-    * All tables referenced by the query must be registered in the TableEnvironment.
-    *
-    * @param query The SQL query to evaluate.
-    * @return The result of the query as Table.
-    */
-  override def sql(query: String): Table = {
-
-    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
-    // parse the sql query
-    val parsed = planner.parse(query)
-    // validate the sql query
-    val validated = planner.validate(parsed)
-    // transform to a relational tree
-    val relational = planner.rel(validated)
-
-    new Table(this, LogicalRelNode(relational.rel))
-  }
-
-  /**
-    * Writes a [[Table]] to a [[TableSink]].
-    *
-    * Internally, the [[Table]] is translated into a [[DataStream]] and handed over to the
-    * [[TableSink]] to write it.
-    *
-    * @param table The [[Table]] to write.
-    * @param sink The [[TableSink]] to write the [[Table]] to.
-    * @tparam T The expected type of the [[DataStream]] which represents the [[Table]].
-    */
-  override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = {
-
-    sink match {
-      case streamSink: StreamTableSink[T] =>
-        val outputType = sink.getOutputType
-        // translate the Table into a DataStream and provide the type that the TableSink expects.
-        val result: DataStream[T] = translate(table)(outputType)
-        // Give the DataSet to the TableSink to emit it.
-        streamSink.emitDataStream(result)
-      case _ =>
-        throw new TableException("StreamTableSink required to emit streaming Table")
-    }
-  }
-
-  /**
-    * Registers a [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s
-    * catalog.
-    *
-    * @param name The name under which the table is registered in the catalog.
-    * @param dataStream The [[DataStream]] to register as table in the catalog.
-    * @tparam T the type of the [[DataStream]].
-    */
-  protected def registerDataStreamInternal[T](
-    name: String,
-    dataStream: DataStream[T]): Unit = {
-
-    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType)
-    val dataStreamTable = new DataStreamTable[T](
-      dataStream,
-      fieldIndexes,
-      fieldNames
-    )
-    registerTableInternal(name, dataStreamTable)
-  }
-
-  /**
-    * Registers a [[DataStream]] as a table under a given name with field names as specified by
-    * field expressions in the [[TableEnvironment]]'s catalog.
-    *
-    * @param name The name under which the table is registered in the catalog.
-    * @param dataStream The [[DataStream]] to register as table in the catalog.
-    * @param fields The field expressions to define the field names of the table.
-    * @tparam T The type of the [[DataStream]].
-    */
-  protected def registerDataStreamInternal[T](
-    name: String,
-    dataStream: DataStream[T],
-    fields: Array[Expression]): Unit = {
-
-    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields.toArray)
-    val dataStreamTable = new DataStreamTable[T](
-      dataStream,
-      fieldIndexes.toArray,
-      fieldNames.toArray
-    )
-    registerTableInternal(name, dataStreamTable)
-  }
-
-  /**
-    * Returns the built-in rules that are defined by the environment.
-    */
-  protected def getBuiltInRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_OPT_RULES
-
-  /**
-    * Generates the optimized [[RelNode]] tree from the original relational node tree.
-    *
-    * @param relNode The root node of the relational expression tree.
-    * @return The optimized [[RelNode]] tree
-    */
-  private[flink] def optimize(relNode: RelNode): RelNode = {
-    // decorrelate
-    val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
-
-    // optimize the logical Flink plan
-    val optProgram = Programs.ofRules(getRuleSet)
-    val flinkOutputProps = relNode.getTraitSet.replace(DataStreamConvention.INSTANCE).simplify()
-
-    val dataStreamPlan = try {
-      optProgram.run(getPlanner, decorPlan, flinkOutputProps)
-    }
-    catch {
-      case e: CannotPlanException =>
-        throw TableException(
-          s"Cannot generate a valid execution plan for the given query: \n\n" +
-            s"${RelOptUtil.toString(relNode)}\n" +
-            s"This exception indicates that the query uses an unsupported SQL feature.\n" +
-            s"Please check the documentation for the set of currently supported SQL features.", e)
-    }
-    dataStreamPlan
-  }
-
-
-  /**
-    * Translates a [[Table]] into a [[DataStream]].
-    *
-    * The transformation involves optimizing the relational expression tree as defined by
-    * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
-    *
-    * @param table The root node of the relational expression tree.
-    * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
-    * @tparam A The type of the resulting [[DataStream]].
-    * @return The [[DataStream]] that corresponds to the translated [[Table]].
-    */
-  protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
-    val dataStreamPlan = optimize(table.getRelNode)
-    translate(dataStreamPlan)
-  }
-
-  /**
-    * Translates a logical [[RelNode]] into a [[DataStream]].
-    *
-    * @param logicalPlan The root node of the relational expression tree.
-    * @param tpe         The [[TypeInformation]] of the resulting [[DataStream]].
-    * @tparam A The type of the resulting [[DataStream]].
-    * @return The [[DataStream]] that corresponds to the translated [[Table]].
-    */
-  protected def translate[A]
-      (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = {
-
-    validateType(tpe)
-
-    logicalPlan match {
-      case node: DataStreamRel =>
-        node.translateToPlan(
-          this,
-          Some(tpe.asInstanceOf[TypeInformation[Any]])
-        ).asInstanceOf[DataStream[A]]
-      case _ => ???
-    }
-  }
-
-  /**
-    * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
-    * the result of the given [[Table]].
-    *
-    * @param table The table for which the AST and execution plan will be returned.
-    */
-  def explain(table: Table): String = {
-    val ast = table.getRelNode
-    val optimizedPlan = optimize(ast)
-    val dataStream = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))
-
-    val env = dataStream.getExecutionEnvironment
-    val jsonSqlPlan = env.getExecutionPlan
-
-    val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false)
-
-    s"== Abstract Syntax Tree ==" +
-        System.lineSeparator +
-        s"${RelOptUtil.toString(ast)}" +
-        System.lineSeparator +
-        s"== Optimized Logical Plan ==" +
-        System.lineSeparator +
-        s"${RelOptUtil.toString(optimizedPlan)}" +
-        System.lineSeparator +
-        s"== Physical Execution Plan ==" +
-        System.lineSeparator +
-        s"$sqlPlan"
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
deleted file mode 100644
index 37d9cb5..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-import java.util.TimeZone
-
-/**
- * A config to define the runtime behavior of the Table API.
- */
-class TableConfig {
-
-  /**
-   * Defines the timezone for date/time/timestamp conversions.
-   */
-  private var timeZone: TimeZone = TimeZone.getTimeZone("UTC")
-
-  /**
-   * Defines if all fields need to be checked for NULL first.
-   */
-  private var nullCheck: Boolean = true
-
-  /**
-    * Defines if efficient types (such as Tuple types or Atomic types)
-    * should be used within operators where possible.
-    */
-  private var efficientTypeUsage = false
-
-  /**
-    * Defines the configuration of Calcite for Table API and SQL queries.
-    */
-  private var calciteConfig = CalciteConfig.DEFAULT
-
-  /**
-   * Sets the timezone for date/time/timestamp conversions.
-   */
-  def setTimeZone(timeZone: TimeZone): Unit = {
-    require(timeZone != null, "timeZone must not be null.")
-    this.timeZone = timeZone
-  }
-
-  /**
-   * Returns the timezone for date/time/timestamp conversions.
-   */
-  def getTimeZone = timeZone
-
-  /**
-   * Returns the NULL check. If enabled, all fields need to be checked for NULL first.
-   */
-  def getNullCheck = nullCheck
-
-  /**
-   * Sets the NULL check. If enabled, all fields need to be checked for NULL first.
-   */
-  def setNullCheck(nullCheck: Boolean): Unit = {
-    this.nullCheck = nullCheck
-  }
-
-  /**
-    * Returns the usage of efficient types. If enabled, efficient types (such as Tuple types
-    * or Atomic types) are used within operators where possible.
-    *
-    * NOTE: Currently, this is an experimental feature.
-    */
-  def getEfficientTypeUsage = efficientTypeUsage
-
-  /**
-    * Sets the usage of efficient types. If enabled, efficient types (such as Tuple types
-    * or Atomic types) are used within operators where possible.
-    *
-    * NOTE: Currently, this is an experimental feature.
-    */
-  def setEfficientTypeUsage(efficientTypeUsage: Boolean): Unit = {
-    this.efficientTypeUsage = efficientTypeUsage
-  }
-
-  /**
-    * Returns the current configuration of Calcite for Table API and SQL queries.
-    */
-  def getCalciteConfig: CalciteConfig = calciteConfig
-
-  /**
-    * Sets the configuration of Calcite for Table API and SQL queries.
-    * Changing the configuration has no effect after the first query has been defined.
-    */
-  def setCalciteConfig(calciteConfig: CalciteConfig): Unit = {
-    this.calciteConfig = calciteConfig
-  }
-}
-
-object TableConfig {
-  def DEFAULT = new TableConfig()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
deleted file mode 100644
index 07ea860..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ /dev/null
@@ -1,537 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import java.lang.reflect.Modifier
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.apache.calcite.config.Lex
-import org.apache.calcite.plan.RelOptPlanner
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.schema.SchemaPlus
-import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.calcite.sql.SqlOperatorTable
-import org.apache.calcite.sql.parser.SqlParser
-import org.apache.calcite.sql.util.ChainedSqlOperatorTable
-import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets}
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.java.table.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
-import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
-import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
-import org.apache.flink.api.table.codegen.ExpressionReducer
-import org.apache.flink.api.table.expressions.{Alias, Expression, UnresolvedFieldReference}
-import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
-import org.apache.flink.api.table.functions.{ScalarFunction, TableFunction}
-import org.apache.flink.api.table.plan.cost.DataSetCostFactory
-import org.apache.flink.api.table.plan.schema.RelTable
-import org.apache.flink.api.table.sinks.TableSink
-import org.apache.flink.api.table.validate.FunctionCatalog
-import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv}
-import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv}
-
-import scala.collection.JavaConverters._
-
-/**
-  * The abstract base class for batch and stream TableEnvironments.
-  *
-  * @param config The configuration of the TableEnvironment
-  */
-abstract class TableEnvironment(val config: TableConfig) {
-
-  // the catalog to hold all registered and translated tables
-  private val tables: SchemaPlus = Frameworks.createRootSchema(true)
-
-  // Table API/SQL function catalog
-  private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns
-
-  // the configuration to create a Calcite planner
-  private lazy val frameworkConfig: FrameworkConfig = Frameworks
-    .newConfigBuilder
-    .defaultSchema(tables)
-    .parserConfig(getSqlParserConfig)
-    .costFactory(new DataSetCostFactory)
-    .typeSystem(new FlinkTypeSystem)
-    .operatorTable(getSqlOperatorTable)
-    // set the executor to evaluate constant expressions
-    .executor(new ExpressionReducer(config))
-    .build
-
-  // the builder for Calcite RelNodes, Calcite's representation of a relational expression tree.
-  protected lazy val relBuilder: FlinkRelBuilder = FlinkRelBuilder.create(frameworkConfig)
-
-  // the planner instance used to optimize queries of this TableEnvironment
-  private lazy val planner: RelOptPlanner = relBuilder.getPlanner
-
-  private lazy val typeFactory: FlinkTypeFactory = relBuilder.getTypeFactory
-
-  // a counter for unique attribute names
-  private val attrNameCntr: AtomicInteger = new AtomicInteger(0)
-
-  /** Returns the table config to define the runtime behavior of the Table API. */
-  def getConfig = config
-
-  /**
-    * Returns the operator table for this environment including a custom Calcite configuration.
-    */
-  protected def getSqlOperatorTable: SqlOperatorTable = {
-    val calciteConfig = config.getCalciteConfig
-    calciteConfig.getSqlOperatorTable match {
-
-      case None =>
-        functionCatalog.getSqlOperatorTable
-
-      case Some(table) =>
-        if (calciteConfig.replacesSqlOperatorTable) {
-          table
-        } else {
-          ChainedSqlOperatorTable.of(functionCatalog.getSqlOperatorTable, table)
-        }
-    }
-  }
-
-  /**
-    * Returns the rule set for this environment including a custom Calcite configuration.
-    */
-  protected def getRuleSet: RuleSet = {
-    val calciteConfig = config.getCalciteConfig
-    calciteConfig.getRuleSet match {
-
-      case None =>
-        getBuiltInRuleSet
-
-      case Some(ruleSet) =>
-        if (calciteConfig.replacesRuleSet) {
-          ruleSet
-        } else {
-          RuleSets.ofList((getBuiltInRuleSet.asScala ++ ruleSet.asScala).asJava)
-        }
-    }
-  }
-
-  /**
-    * Returns the SQL parser config for this environment including a custom Calcite configuration.
-    */
-  protected def getSqlParserConfig: SqlParser.Config = {
-    val calciteConfig = config.getCalciteConfig
-    calciteConfig.getSqlParserConfig match {
-
-      case None =>
-        // we use Java lex because back ticks are easier than double quotes in programming
-        // and cases are preserved
-        SqlParser
-          .configBuilder()
-          .setLex(Lex.JAVA)
-          .build()
-
-      case Some(sqlParserConfig) =>
-        sqlParserConfig
-    }
-  }
-
-  /**
-    * Returns the built-in rules that are defined by the environment.
-    */
-  protected def getBuiltInRuleSet: RuleSet
-
-  /**
-    * Registers a [[ScalarFunction]] under a unique name. Replaces already existing
-    * user-defined functions under this name.
-    */
-  def registerFunction(name: String, function: ScalarFunction): Unit = {
-    // check if class could be instantiated
-    checkForInstantiation(function.getClass)
-
-    // register in Table API
-    functionCatalog.registerFunction(name, function.getClass)
-
-    // register in SQL API
-    functionCatalog.registerSqlFunction(createScalarSqlFunction(name, function, typeFactory))
-  }
-
-  /**
-    * Registers a [[TableFunction]] under a unique name. Replaces already existing
-    * user-defined functions under this name.
-    */
-  private[flink] def registerTableFunctionInternal[T: TypeInformation](
-    name: String, function: TableFunction[T]): Unit = {
-    // check if class not Scala object
-    checkNotSingleton(function.getClass)
-    // check if class could be instantiated
-    checkForInstantiation(function.getClass)
-
-    val typeInfo: TypeInformation[_] = if (function.getResultType != null) {
-      function.getResultType
-    } else {
-      implicitly[TypeInformation[T]]
-    }
-
-    // register in Table API
-    functionCatalog.registerFunction(name, function.getClass)
-
-    // register in SQL API
-    val sqlFunctions = createTableSqlFunctions(name, function, typeInfo, typeFactory)
-    functionCatalog.registerSqlFunctions(sqlFunctions)
-  }
-
-  /**
-    * Registers a [[Table]] under a unique name in the TableEnvironment's catalog.
-    * Registered tables can be referenced in SQL queries.
-    *
-    * @param name The name under which the table is registered.
-    * @param table The table to register.
-    */
-  def registerTable(name: String, table: Table): Unit = {
-
-    // check that table belongs to this table environment
-    if (table.tableEnv != this) {
-      throw new TableException(
-        "Only tables that belong to this TableEnvironment can be registered.")
-    }
-
-    checkValidTableName(name)
-    val tableTable = new RelTable(table.getRelNode)
-    registerTableInternal(name, tableTable)
-  }
-
-  /**
-    * Replaces a registered Table with another Table under the same name.
-    * We use this method to replace a [[org.apache.flink.api.table.plan.schema.DataStreamTable]]
-    * with a [[org.apache.calcite.schema.TranslatableTable]].
-    *
-    * @param name Name of the table to replace.
-    * @param table The table that replaces the previous table.
-    */
-  protected def replaceRegisteredTable(name: String, table: AbstractTable): Unit = {
-
-    if (isRegistered(name)) {
-      tables.add(name, table)
-    } else {
-      throw new TableException(s"Table \'$name\' is not registered.")
-    }
-  }
-
-  /**
-    * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
-    *
-    * All tables referenced by the query must be registered in the TableEnvironment.
-    *
-    * @param query The SQL query to evaluate.
-    * @return The result of the query as Table.
-    */
-  def sql(query: String): Table
-
-  /**
-    * Writes a [[Table]] to a [[TableSink]].
-    *
-    * @param table The [[Table]] to write.
-    * @param sink The [[TableSink]] to write the [[Table]] to.
-    * @tparam T The data type that the [[TableSink]] expects.
-    */
-  private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit
-
-  /**
-    * Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog.
-    *
-    * @param name The name under which the table is registered.
-    * @param table The table to register in the catalog
-    * @throws TableException if another table is registered under the provided name.
-    */
-  @throws[TableException]
-  protected def registerTableInternal(name: String, table: AbstractTable): Unit = {
-
-    if (isRegistered(name)) {
-      throw new TableException(s"Table \'$name\' already exists. " +
-        s"Please, choose a different name.")
-    } else {
-      tables.add(name, table)
-    }
-  }
-
-  /**
-    * Checks if the chosen table name is valid.
-    *
-    * @param name The table name to check.
-    */
-  protected def checkValidTableName(name: String): Unit
-
-  /**
-    * Checks if a table is registered under the given name.
-    *
-    * @param name The table name to check.
-    * @return true, if a table is registered under the name, false otherwise.
-    */
-  protected def isRegistered(name: String): Boolean = {
-    tables.getTableNames.contains(name)
-  }
-
-  protected def getRowType(name: String): RelDataType = {
-    tables.getTable(name).getRowType(typeFactory)
-  }
-
-  /** Returns a unique temporary attribute name. */
-  private[flink] def createUniqueAttributeName(): String = {
-    "TMP_" + attrNameCntr.getAndIncrement()
-  }
-
-  /** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */
-  private[flink] def getRelBuilder: FlinkRelBuilder = {
-    relBuilder
-  }
-
-  /** Returns the Calcite [[org.apache.calcite.plan.RelOptPlanner]] of this TableEnvironment. */
-  private[flink] def getPlanner: RelOptPlanner = {
-    planner
-  }
-
-  /** Returns the [[FlinkTypeFactory]] of this TableEnvironment. */
-  private[flink] def getTypeFactory: FlinkTypeFactory = {
-    typeFactory
-  }
-
-  private[flink] def getFunctionCatalog: FunctionCatalog = {
-    functionCatalog
-  }
-
-  /** Returns the Calcite [[FrameworkConfig]] of this TableEnvironment. */
-  private[flink] def getFrameworkConfig: FrameworkConfig = {
-    frameworkConfig
-  }
-
-  protected def validateType(typeInfo: TypeInformation[_]): Unit = {
-    val clazz = typeInfo.getTypeClass
-    if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
-        !Modifier.isPublic(clazz.getModifiers) ||
-        clazz.getCanonicalName == null) {
-      throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " +
-        s"static and globally accessible.")
-    }
-  }
-
-  /**
-    * Returns field names and field positions for a given [[TypeInformation]].
-    *
-    * Field names are automatically extracted for
-    * [[org.apache.flink.api.common.typeutils.CompositeType]].
-    * The method fails if inputType is not a
-    * [[org.apache.flink.api.common.typeutils.CompositeType]].
-    *
-    * @param inputType The TypeInformation extract the field names and positions from.
-    * @tparam A The type of the TypeInformation.
-    * @return A tuple of two arrays holding the field names and corresponding field positions.
-    */
-  protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
-      (Array[String], Array[Int]) =
-  {
-    validateType(inputType)
-
-    val fieldNames: Array[String] = inputType match {
-      case t: TupleTypeInfo[A] => t.getFieldNames
-      case c: CaseClassTypeInfo[A] => c.getFieldNames
-      case p: PojoTypeInfo[A] => p.getFieldNames
-      case r: RowTypeInfo => r.getFieldNames
-      case tpe =>
-        throw new TableException(s"Type $tpe lacks explicit field naming")
-    }
-    val fieldIndexes = fieldNames.indices.toArray
-
-    if (fieldNames.contains("*")) {
-      throw new TableException("Field name can not be '*'.")
-    }
-
-    (fieldNames, fieldIndexes)
-  }
-
-  /**
-    * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
-    * [[Expression]].
-    *
-    * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
-    * @param exprs The expressions that define the field names.
-    * @tparam A The type of the TypeInformation.
-    * @return A tuple of two arrays holding the field names and corresponding field positions.
-    */
-  protected[flink] def getFieldInfo[A](
-    inputType: TypeInformation[A],
-    exprs: Array[Expression]): (Array[String], Array[Int]) = {
-
-    validateType(inputType)
-
-    val indexedNames: Array[(Int, String)] = inputType match {
-      case a: AtomicType[A] =>
-        if (exprs.length != 1) {
-          throw new TableException("Table of atomic type can only have a single field.")
-        }
-        exprs.map {
-          case UnresolvedFieldReference(name) => (0, name)
-          case _ => throw new TableException("Field reference expression expected.")
-        }
-      case t: TupleTypeInfo[A] =>
-        exprs.zipWithIndex.map {
-          case (UnresolvedFieldReference(name), idx) => (idx, name)
-          case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
-            val idx = t.getFieldIndex(origName)
-            if (idx < 0) {
-              throw new TableException(s"$origName is not a field of type $t")
-            }
-            (idx, name)
-          case _ => throw new TableException(
-            "Field reference expression or alias on field expression expected.")
-        }
-      case c: CaseClassTypeInfo[A] =>
-        exprs.zipWithIndex.map {
-          case (UnresolvedFieldReference(name), idx) => (idx, name)
-          case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
-            val idx = c.getFieldIndex(origName)
-            if (idx < 0) {
-              throw new TableException(s"$origName is not a field of type $c")
-            }
-            (idx, name)
-          case _ => throw new TableException(
-            "Field reference expression or alias on field expression expected.")
-        }
-      case p: PojoTypeInfo[A] =>
-        exprs.map {
-          case (UnresolvedFieldReference(name)) =>
-            val idx = p.getFieldIndex(name)
-            if (idx < 0) {
-              throw new TableException(s"$name is not a field of type $p")
-            }
-            (idx, name)
-          case Alias(UnresolvedFieldReference(origName), name, _) =>
-            val idx = p.getFieldIndex(origName)
-            if (idx < 0) {
-              throw new TableException(s"$origName is not a field of type $p")
-            }
-            (idx, name)
-          case _ => throw new TableException(
-            "Field reference expression or alias on field expression expected.")
-        }
-      case tpe => throw new TableException(
-        s"Source of type $tpe cannot be converted into Table.")
-    }
-
-    val (fieldIndexes, fieldNames) = indexedNames.unzip
-
-    if (fieldNames.contains("*")) {
-      throw new TableException("Field name can not be '*'.")
-    }
-
-    (fieldNames.toArray, fieldIndexes.toArray)
-  }
-
-}
-
-/**
-  * Object to instantiate a [[TableEnvironment]] depending on the batch or stream execution
-  * environment.
-  */
-object TableEnvironment {
-
-  /**
-    * Returns a [[JavaBatchTableEnv]] for a Java [[JavaBatchExecEnv]].
-    *
-    * @param executionEnvironment The Java batch ExecutionEnvironment.
-    */
-  def getTableEnvironment(executionEnvironment: JavaBatchExecEnv): JavaBatchTableEnv = {
-    new JavaBatchTableEnv(executionEnvironment, new TableConfig())
-  }
-
-  /**
-    * Returns a [[JavaBatchTableEnv]] for a Java [[JavaBatchExecEnv]] and a given [[TableConfig]].
-    *
-    * @param executionEnvironment The Java batch ExecutionEnvironment.
-    * @param tableConfig The TableConfig for the new TableEnvironment.
-    */
-  def getTableEnvironment(
-    executionEnvironment: JavaBatchExecEnv,
-    tableConfig: TableConfig): JavaBatchTableEnv = {
-
-    new JavaBatchTableEnv(executionEnvironment, tableConfig)
-  }
-
-  /**
-    * Returns a [[ScalaBatchTableEnv]] for a Scala [[ScalaBatchExecEnv]].
-    *
-    * @param executionEnvironment The Scala batch ExecutionEnvironment.
-    */
-  def getTableEnvironment(executionEnvironment: ScalaBatchExecEnv): ScalaBatchTableEnv = {
-    new ScalaBatchTableEnv(executionEnvironment, new TableConfig())
-  }
-
-  /**
-    * Returns a [[ScalaBatchTableEnv]] for a Scala [[ScalaBatchExecEnv]] and a given
-    * [[TableConfig]].
-    *
-    * @param executionEnvironment The Scala batch ExecutionEnvironment.
-    * @param tableConfig The TableConfig for the new TableEnvironment.
-    */
-  def getTableEnvironment(
-    executionEnvironment: ScalaBatchExecEnv,
-    tableConfig: TableConfig): ScalaBatchTableEnv = {
-
-    new ScalaBatchTableEnv(executionEnvironment, tableConfig)
-  }
-
-  /**
-    * Returns a [[JavaStreamTableEnv]] for a Java [[JavaStreamExecEnv]].
-    *
-    * @param executionEnvironment The Java StreamExecutionEnvironment.
-    */
-  def getTableEnvironment(executionEnvironment: JavaStreamExecEnv): JavaStreamTableEnv = {
-    new JavaStreamTableEnv(executionEnvironment, new TableConfig())
-  }
-
-  /**
-    * Returns a [[JavaStreamTableEnv]] for a Java [[JavaStreamExecEnv]] and a given [[TableConfig]].
-    *
-    * @param executionEnvironment The Java StreamExecutionEnvironment.
-    * @param tableConfig The TableConfig for the new TableEnvironment.
-    */
-  def getTableEnvironment(
-    executionEnvironment: JavaStreamExecEnv,
-    tableConfig: TableConfig): JavaStreamTableEnv = {
-
-    new JavaStreamTableEnv(executionEnvironment, tableConfig)
-  }
-
-  /**
-    * Returns a [[ScalaStreamTableEnv]] for a Scala stream [[ScalaStreamExecEnv]].
-    *
-    * @param executionEnvironment The Scala StreamExecutionEnvironment.
-    */
-  def getTableEnvironment(executionEnvironment: ScalaStreamExecEnv): ScalaStreamTableEnv = {
-    new ScalaStreamTableEnv(executionEnvironment, new TableConfig())
-  }
-
-  /**
-    * Returns a [[ScalaStreamTableEnv]] for a Scala stream [[ScalaStreamExecEnv]].
-    *
-    * @param executionEnvironment The Scala StreamExecutionEnvironment.
-    * @param tableConfig The TableConfig for the new TableEnvironment.
-    */
-  def getTableEnvironment(
-    executionEnvironment: ScalaStreamExecEnv,
-    tableConfig: TableConfig): ScalaStreamTableEnv = {
-
-    new ScalaStreamTableEnv(executionEnvironment, tableConfig)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala
deleted file mode 100644
index a988152..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
-import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo
-
-/**
-  * This class enumerates all supported types of the Table API.
-  */
-object Types {
-
-  val STRING = BasicTypeInfo.STRING_TYPE_INFO
-  val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO
-
-  val BYTE = BasicTypeInfo.BYTE_TYPE_INFO
-  val SHORT = BasicTypeInfo.SHORT_TYPE_INFO
-  val INT = BasicTypeInfo.INT_TYPE_INFO
-  val LONG = BasicTypeInfo.LONG_TYPE_INFO
-  val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO
-  val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO
-  val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO
-
-  val DATE = SqlTimeTypeInfo.DATE
-  val TIME = SqlTimeTypeInfo.TIME
-  val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
-  val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
-  val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenException.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenException.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenException.scala
deleted file mode 100644
index 8b7559f..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenException.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen
-
-/**
-  * Exception for all errors occurring during code generation.
-  */
-class CodeGenException(msg: String) extends RuntimeException(msg)