You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:57 UTC

[28/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
new file mode 100644
index 0000000..4e43001
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import _root_.java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.sql2rel.RelDecorrelator
+import org.apache.calcite.tools.{Programs, RuleSet}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.calcite.FlinkPlannerImpl
+import org.apache.flink.table.explain.PlanJsonParser
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
+import org.apache.flink.table.plan.rules.FlinkRuleSets
+import org.apache.flink.table.plan.schema.{DataStreamTable, TableSourceTable}
+import org.apache.flink.table.sinks.{StreamTableSink, TableSink}
+import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.types.Row
+
+/**
+  * The base class for stream TableEnvironments.
+  *
+  * A TableEnvironment can be used to:
+  * - convert [[DataStream]] to a [[Table]]
+  * - register a [[DataStream]] as a table in the catalog
+  * - register a [[Table]] in the catalog
+  * - scan a registered table to obtain a [[Table]]
+  * - specify a SQL query on registered tables to obtain a [[Table]]
+  * - convert a [[Table]] into a [[DataStream]]
+  *
+  * @param execEnv The [[StreamExecutionEnvironment]] which is wrapped in this
+  *                [[StreamTableEnvironment]].
+  * @param config The [[TableConfig]] of this [[StreamTableEnvironment]].
+  */
+abstract class StreamTableEnvironment(
+    private[flink] val execEnv: StreamExecutionEnvironment,
+    config: TableConfig)
+  extends TableEnvironment(config) {
+
+  // a counter for unique table names
+  private val nameCntr: AtomicInteger = new AtomicInteger(0)
+
+  // the naming pattern for internally registered tables.
+  private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r
+
+  /**
+    * Checks if the chosen table name is valid.
+    *
+    * @param name The table name to check.
+    */
+  override protected def checkValidTableName(name: String): Unit = {
+    val m = internalNamePattern.findFirstIn(name)
+    m match {
+      case Some(_) =>
+        throw new TableException(s"Illegal Table name. " +
+          s"Please choose a name that does not contain the pattern $internalNamePattern")
+      case None =>
+    }
+  }
+
+  /** Returns a unique table name according to the internal naming pattern. */
+  protected def createUniqueTableName(): String = "_DataStreamTable_" + nameCntr.getAndIncrement()
+
+  /**
+    * Returns field names and field positions for a given [[TypeInformation]].
+    *
+    * Field names are automatically extracted for
+    * [[org.apache.flink.api.common.typeutils.CompositeType]].
+    * The method fails if inputType is not a
+    * [[org.apache.flink.api.common.typeutils.CompositeType]].
+    *
+    * @param inputType The TypeInformation extract the field names and positions from.
+    * @tparam A The type of the TypeInformation.
+    * @return A tuple of two arrays holding the field names and corresponding field positions.
+    */
+  override protected[flink] def getFieldInfo[A](inputType: TypeInformation[A])
+    : (Array[String], Array[Int]) = {
+    val fieldInfo = super.getFieldInfo(inputType)
+    if (fieldInfo._1.contains("rowtime")) {
+      throw new TableException("'rowtime' ia a reserved field name in stream environment.")
+    }
+    fieldInfo
+  }
+
+  /**
+    * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
+    * [[Expression]].
+    *
+    * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
+    * @param exprs     The expressions that define the field names.
+    * @tparam A The type of the TypeInformation.
+    * @return A tuple of two arrays holding the field names and corresponding field positions.
+    */
+  override protected[flink] def getFieldInfo[A](
+      inputType: TypeInformation[A],
+      exprs: Array[Expression])
+    : (Array[String], Array[Int]) = {
+    val fieldInfo = super.getFieldInfo(inputType, exprs)
+    if (fieldInfo._1.contains("rowtime")) {
+      throw new TableException("'rowtime' is a reserved field name in stream environment.")
+    }
+    fieldInfo
+  }
+
+  /**
+    * Ingests a registered table and returns the resulting [[Table]].
+    *
+    * The table to ingest must be registered in the [[TableEnvironment]]'s catalog.
+    *
+    * @param tableName The name of the table to ingest.
+    * @throws ValidationException if no table is registered under the given name.
+    * @return The ingested table.
+    */
+  @throws[ValidationException]
+  def ingest(tableName: String): Table = {
+
+    if (isRegistered(tableName)) {
+      new Table(this, CatalogNode(tableName, getRowType(tableName)))
+    }
+    else {
+      throw new ValidationException(s"Table \'$tableName\' was not found in the registry.")
+    }
+  }
+
+  /**
+    * Registers an external [[StreamTableSource]] in this [[TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * @param name        The name under which the [[StreamTableSource]] is registered.
+    * @param tableSource The [[org.apache.flink.table.sources.StreamTableSource]] to register.
+    */
+  def registerTableSource(name: String, tableSource: StreamTableSource[_]): Unit = {
+
+    checkValidTableName(name)
+    registerTableInternal(name, new TableSourceTable(tableSource))
+  }
+
+  /**
+    * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
+    *
+    * All tables referenced by the query must be registered in the TableEnvironment.
+    *
+    * @param query The SQL query to evaluate.
+    * @return The result of the query as Table.
+    */
+  override def sql(query: String): Table = {
+
+    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
+    // parse the sql query
+    val parsed = planner.parse(query)
+    // validate the sql query
+    val validated = planner.validate(parsed)
+    // transform to a relational tree
+    val relational = planner.rel(validated)
+
+    new Table(this, LogicalRelNode(relational.rel))
+  }
+
+  /**
+    * Writes a [[Table]] to a [[TableSink]].
+    *
+    * Internally, the [[Table]] is translated into a [[DataStream]] and handed over to the
+    * [[TableSink]] to write it.
+    *
+    * @param table The [[Table]] to write.
+    * @param sink The [[TableSink]] to write the [[Table]] to.
+    * @tparam T The expected type of the [[DataStream]] which represents the [[Table]].
+    */
+  override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = {
+
+    sink match {
+      case streamSink: StreamTableSink[T] =>
+        val outputType = sink.getOutputType
+        // translate the Table into a DataStream and provide the type that the TableSink expects.
+        val result: DataStream[T] = translate(table)(outputType)
+        // Give the DataSet to the TableSink to emit it.
+        streamSink.emitDataStream(result)
+      case _ =>
+        throw new TableException("StreamTableSink required to emit streaming Table")
+    }
+  }
+
+  /**
+    * Registers a [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s
+    * catalog.
+    *
+    * @param name The name under which the table is registered in the catalog.
+    * @param dataStream The [[DataStream]] to register as table in the catalog.
+    * @tparam T the type of the [[DataStream]].
+    */
+  protected def registerDataStreamInternal[T](
+    name: String,
+    dataStream: DataStream[T]): Unit = {
+
+    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType)
+    val dataStreamTable = new DataStreamTable[T](
+      dataStream,
+      fieldIndexes,
+      fieldNames
+    )
+    registerTableInternal(name, dataStreamTable)
+  }
+
+  /**
+    * Registers a [[DataStream]] as a table under a given name with field names as specified by
+    * field expressions in the [[TableEnvironment]]'s catalog.
+    *
+    * @param name The name under which the table is registered in the catalog.
+    * @param dataStream The [[DataStream]] to register as table in the catalog.
+    * @param fields The field expressions to define the field names of the table.
+    * @tparam T The type of the [[DataStream]].
+    */
+  protected def registerDataStreamInternal[T](
+    name: String,
+    dataStream: DataStream[T],
+    fields: Array[Expression]): Unit = {
+
+    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields.toArray)
+    val dataStreamTable = new DataStreamTable[T](
+      dataStream,
+      fieldIndexes.toArray,
+      fieldNames.toArray
+    )
+    registerTableInternal(name, dataStreamTable)
+  }
+
+  /**
+    * Returns the built-in rules that are defined by the environment.
+    */
+  protected def getBuiltInRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_OPT_RULES
+
+  /**
+    * Generates the optimized [[RelNode]] tree from the original relational node tree.
+    *
+    * @param relNode The root node of the relational expression tree.
+    * @return The optimized [[RelNode]] tree
+    */
+  private[flink] def optimize(relNode: RelNode): RelNode = {
+    // decorrelate
+    val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
+
+    // optimize the logical Flink plan
+    val optProgram = Programs.ofRules(getRuleSet)
+    val flinkOutputProps = relNode.getTraitSet.replace(DataStreamConvention.INSTANCE).simplify()
+
+    val dataStreamPlan = try {
+      optProgram.run(getPlanner, decorPlan, flinkOutputProps)
+    }
+    catch {
+      case e: CannotPlanException =>
+        throw TableException(
+          s"Cannot generate a valid execution plan for the given query: \n\n" +
+            s"${RelOptUtil.toString(relNode)}\n" +
+            s"This exception indicates that the query uses an unsupported SQL feature.\n" +
+            s"Please check the documentation for the set of currently supported SQL features.", e)
+    }
+    dataStreamPlan
+  }
+
+
+  /**
+    * Translates a [[Table]] into a [[DataStream]].
+    *
+    * The transformation involves optimizing the relational expression tree as defined by
+    * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
+    *
+    * @param table The root node of the relational expression tree.
+    * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
+    * @tparam A The type of the resulting [[DataStream]].
+    * @return The [[DataStream]] that corresponds to the translated [[Table]].
+    */
+  protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
+    val dataStreamPlan = optimize(table.getRelNode)
+    translate(dataStreamPlan)
+  }
+
+  /**
+    * Translates a logical [[RelNode]] into a [[DataStream]].
+    *
+    * @param logicalPlan The root node of the relational expression tree.
+    * @param tpe         The [[TypeInformation]] of the resulting [[DataStream]].
+    * @tparam A The type of the resulting [[DataStream]].
+    * @return The [[DataStream]] that corresponds to the translated [[Table]].
+    */
+  protected def translate[A]
+      (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = {
+
+    validateType(tpe)
+
+    logicalPlan match {
+      case node: DataStreamRel =>
+        node.translateToPlan(
+          this,
+          Some(tpe.asInstanceOf[TypeInformation[Any]])
+        ).asInstanceOf[DataStream[A]]
+      case _ => ???
+    }
+  }
+
+  /**
+    * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
+    * the result of the given [[Table]].
+    *
+    * @param table The table for which the AST and execution plan will be returned.
+    */
+  def explain(table: Table): String = {
+    val ast = table.getRelNode
+    val optimizedPlan = optimize(ast)
+    val dataStream = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))
+
+    val env = dataStream.getExecutionEnvironment
+    val jsonSqlPlan = env.getExecutionPlan
+
+    val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false)
+
+    s"== Abstract Syntax Tree ==" +
+        System.lineSeparator +
+        s"${RelOptUtil.toString(ast)}" +
+        System.lineSeparator +
+        s"== Optimized Logical Plan ==" +
+        System.lineSeparator +
+        s"${RelOptUtil.toString(optimizedPlan)}" +
+        System.lineSeparator +
+        s"== Physical Execution Plan ==" +
+        System.lineSeparator +
+        s"$sqlPlan"
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
new file mode 100644
index 0000000..a8876a8
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api
+
+import _root_.java.util.TimeZone
+
+import org.apache.flink.table.calcite.CalciteConfig
+
+/**
+ * A config to define the runtime behavior of the Table API.
+ */
+class TableConfig {
+
+  /**
+   * Defines the timezone for date/time/timestamp conversions.
+   */
+  private var timeZone: TimeZone = TimeZone.getTimeZone("UTC")
+
+  /**
+   * Defines if all fields need to be checked for NULL first.
+   */
+  private var nullCheck: Boolean = true
+
+  /**
+    * Defines if efficient types (such as Tuple types or Atomic types)
+    * should be used within operators where possible.
+    */
+  private var efficientTypeUsage = false
+
+  /**
+    * Defines the configuration of Calcite for Table API and SQL queries.
+    */
+  private var calciteConfig = CalciteConfig.DEFAULT
+
+  /**
+   * Sets the timezone for date/time/timestamp conversions.
+   */
+  def setTimeZone(timeZone: TimeZone): Unit = {
+    require(timeZone != null, "timeZone must not be null.")
+    this.timeZone = timeZone
+  }
+
+  /**
+   * Returns the timezone for date/time/timestamp conversions.
+   */
+  def getTimeZone = timeZone
+
+  /**
+   * Returns the NULL check. If enabled, all fields need to be checked for NULL first.
+   */
+  def getNullCheck = nullCheck
+
+  /**
+   * Sets the NULL check. If enabled, all fields need to be checked for NULL first.
+   */
+  def setNullCheck(nullCheck: Boolean): Unit = {
+    this.nullCheck = nullCheck
+  }
+
+  /**
+    * Returns the usage of efficient types. If enabled, efficient types (such as Tuple types
+    * or Atomic types) are used within operators where possible.
+    *
+    * NOTE: Currently, this is an experimental feature.
+    */
+  def getEfficientTypeUsage = efficientTypeUsage
+
+  /**
+    * Sets the usage of efficient types. If enabled, efficient types (such as Tuple types
+    * or Atomic types) are used within operators where possible.
+    *
+    * NOTE: Currently, this is an experimental feature.
+    */
+  def setEfficientTypeUsage(efficientTypeUsage: Boolean): Unit = {
+    this.efficientTypeUsage = efficientTypeUsage
+  }
+
+  /**
+    * Returns the current configuration of Calcite for Table API and SQL queries.
+    */
+  def getCalciteConfig: CalciteConfig = calciteConfig
+
+  /**
+    * Sets the configuration of Calcite for Table API and SQL queries.
+    * Changing the configuration has no effect after the first query has been defined.
+    */
+  def setCalciteConfig(calciteConfig: CalciteConfig): Unit = {
+    this.calciteConfig = calciteConfig
+  }
+}
+
+object TableConfig {
+  def DEFAULT = new TableConfig()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
new file mode 100644
index 0000000..769008f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import _root_.java.util.concurrent.atomic.AtomicInteger
+import _root_.java.lang.reflect.Modifier
+
+import org.apache.calcite.config.Lex
+import org.apache.calcite.plan.RelOptPlanner
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.schema.SchemaPlus
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.sql.SqlOperatorTable
+import org.apache.calcite.sql.parser.SqlParser
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable
+import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets}
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv}
+import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv}
+import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
+import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
+import org.apache.flink.table.codegen.ExpressionReducer
+import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
+import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
+import org.apache.flink.table.plan.cost.DataSetCostFactory
+import org.apache.flink.table.plan.schema.RelTable
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.validate.FunctionCatalog
+
+import _root_.scala.collection.JavaConverters._
+
+/**
+  * The abstract base class for batch and stream TableEnvironments.
+  *
+  * @param config The configuration of the TableEnvironment
+  */
+abstract class TableEnvironment(val config: TableConfig) {
+
+  // the catalog to hold all registered and translated tables
+  private val tables: SchemaPlus = Frameworks.createRootSchema(true)
+
+  // Table API/SQL function catalog
+  private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns
+
+  // the configuration to create a Calcite planner
+  private lazy val frameworkConfig: FrameworkConfig = Frameworks
+    .newConfigBuilder
+    .defaultSchema(tables)
+    .parserConfig(getSqlParserConfig)
+    .costFactory(new DataSetCostFactory)
+    .typeSystem(new FlinkTypeSystem)
+    .operatorTable(getSqlOperatorTable)
+    // set the executor to evaluate constant expressions
+    .executor(new ExpressionReducer(config))
+    .build
+
+  // the builder for Calcite RelNodes, Calcite's representation of a relational expression tree.
+  protected lazy val relBuilder: FlinkRelBuilder = FlinkRelBuilder.create(frameworkConfig)
+
+  // the planner instance used to optimize queries of this TableEnvironment
+  private lazy val planner: RelOptPlanner = relBuilder.getPlanner
+
+  private lazy val typeFactory: FlinkTypeFactory = relBuilder.getTypeFactory
+
+  // a counter for unique attribute names
+  private val attrNameCntr: AtomicInteger = new AtomicInteger(0)
+
+  /** Returns the table config to define the runtime behavior of the Table API. */
+  def getConfig = config
+
+  /**
+    * Returns the operator table for this environment including a custom Calcite configuration.
+    */
+  protected def getSqlOperatorTable: SqlOperatorTable = {
+    val calciteConfig = config.getCalciteConfig
+    calciteConfig.getSqlOperatorTable match {
+
+      case None =>
+        functionCatalog.getSqlOperatorTable
+
+      case Some(table) =>
+        if (calciteConfig.replacesSqlOperatorTable) {
+          table
+        } else {
+          ChainedSqlOperatorTable.of(functionCatalog.getSqlOperatorTable, table)
+        }
+    }
+  }
+
+  /**
+    * Returns the rule set for this environment including a custom Calcite configuration.
+    */
+  protected def getRuleSet: RuleSet = {
+    val calciteConfig = config.getCalciteConfig
+    calciteConfig.getRuleSet match {
+
+      case None =>
+        getBuiltInRuleSet
+
+      case Some(ruleSet) =>
+        if (calciteConfig.replacesRuleSet) {
+          ruleSet
+        } else {
+          RuleSets.ofList((getBuiltInRuleSet.asScala ++ ruleSet.asScala).asJava)
+        }
+    }
+  }
+
+  /**
+    * Returns the SQL parser config for this environment including a custom Calcite configuration.
+    */
+  protected def getSqlParserConfig: SqlParser.Config = {
+    val calciteConfig = config.getCalciteConfig
+    calciteConfig.getSqlParserConfig match {
+
+      case None =>
+        // we use Java lex because back ticks are easier than double quotes in programming
+        // and cases are preserved
+        SqlParser
+          .configBuilder()
+          .setLex(Lex.JAVA)
+          .build()
+
+      case Some(sqlParserConfig) =>
+        sqlParserConfig
+    }
+  }
+
+  /**
+    * Returns the built-in rules that are defined by the environment.
+    */
+  protected def getBuiltInRuleSet: RuleSet
+
+  /**
+    * Registers a [[ScalarFunction]] under a unique name. Replaces already existing
+    * user-defined functions under this name.
+    */
+  def registerFunction(name: String, function: ScalarFunction): Unit = {
+    // check if class could be instantiated
+    checkForInstantiation(function.getClass)
+
+    // register in Table API
+    functionCatalog.registerFunction(name, function.getClass)
+
+    // register in SQL API
+    functionCatalog.registerSqlFunction(createScalarSqlFunction(name, function, typeFactory))
+  }
+
+  /**
+    * Registers a [[TableFunction]] under a unique name. Replaces already existing
+    * user-defined functions under this name.
+    */
+  private[flink] def registerTableFunctionInternal[T: TypeInformation](
+    name: String, function: TableFunction[T]): Unit = {
+    // check if class not Scala object
+    checkNotSingleton(function.getClass)
+    // check if class could be instantiated
+    checkForInstantiation(function.getClass)
+
+    val typeInfo: TypeInformation[_] = if (function.getResultType != null) {
+      function.getResultType
+    } else {
+      implicitly[TypeInformation[T]]
+    }
+
+    // register in Table API
+    functionCatalog.registerFunction(name, function.getClass)
+
+    // register in SQL API
+    val sqlFunctions = createTableSqlFunctions(name, function, typeInfo, typeFactory)
+    functionCatalog.registerSqlFunctions(sqlFunctions)
+  }
+
+  /**
+    * Registers a [[Table]] under a unique name in the TableEnvironment's catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * @param name The name under which the table is registered.
+    * @param table The table to register.
+    */
+  def registerTable(name: String, table: Table): Unit = {
+
+    // check that table belongs to this table environment
+    if (table.tableEnv != this) {
+      throw new TableException(
+        "Only tables that belong to this TableEnvironment can be registered.")
+    }
+
+    checkValidTableName(name)
+    val tableTable = new RelTable(table.getRelNode)
+    registerTableInternal(name, tableTable)
+  }
+
+  /**
+    * Replaces a registered Table with another Table under the same name.
+    * We use this method to replace a [[org.apache.flink.table.plan.schema.DataStreamTable]]
+    * with a [[org.apache.calcite.schema.TranslatableTable]].
+    *
+    * @param name Name of the table to replace.
+    * @param table The table that replaces the previous table.
+    */
+  protected def replaceRegisteredTable(name: String, table: AbstractTable): Unit = {
+
+    if (isRegistered(name)) {
+      tables.add(name, table)
+    } else {
+      throw new TableException(s"Table \'$name\' is not registered.")
+    }
+  }
+
+  /**
+    * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
+    *
+    * All tables referenced by the query must be registered in the TableEnvironment.
+    *
+    * @param query The SQL query to evaluate.
+    * @return The result of the query as Table.
+    */
+  def sql(query: String): Table
+
+  /**
+    * Writes a [[Table]] to a [[TableSink]].
+    *
+    * @param table The [[Table]] to write.
+    * @param sink The [[TableSink]] to write the [[Table]] to.
+    * @tparam T The data type that the [[TableSink]] expects.
+    */
+  private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit
+
+  /**
+    * Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog.
+    *
+    * @param name The name under which the table is registered.
+    * @param table The table to register in the catalog
+    * @throws TableException if another table is registered under the provided name.
+    */
+  @throws[TableException]
+  protected def registerTableInternal(name: String, table: AbstractTable): Unit = {
+
+    if (isRegistered(name)) {
+      throw new TableException(s"Table \'$name\' already exists. " +
+        s"Please, choose a different name.")
+    } else {
+      tables.add(name, table)
+    }
+  }
+
+  /**
+    * Checks if the chosen table name is valid.
+    *
+    * @param name The table name to check.
+    */
+  protected def checkValidTableName(name: String): Unit
+
+  /**
+    * Checks if a table is registered under the given name.
+    *
+    * @param name The table name to check.
+    * @return true, if a table is registered under the name, false otherwise.
+    */
+  protected def isRegistered(name: String): Boolean = {
+    tables.getTableNames.contains(name)
+  }
+
+  protected def getRowType(name: String): RelDataType = {
+    tables.getTable(name).getRowType(typeFactory)
+  }
+
+  /** Returns a unique temporary attribute name. */
+  private[flink] def createUniqueAttributeName(): String = {
+    "TMP_" + attrNameCntr.getAndIncrement()
+  }
+
+  /** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */
+  private[flink] def getRelBuilder: FlinkRelBuilder = {
+    relBuilder
+  }
+
+  /** Returns the Calcite [[org.apache.calcite.plan.RelOptPlanner]] of this TableEnvironment. */
+  private[flink] def getPlanner: RelOptPlanner = {
+    planner
+  }
+
+  /** Returns the [[FlinkTypeFactory]] of this TableEnvironment. */
+  private[flink] def getTypeFactory: FlinkTypeFactory = {
+    typeFactory
+  }
+
+  private[flink] def getFunctionCatalog: FunctionCatalog = {
+    functionCatalog
+  }
+
+  /** Returns the Calcite [[FrameworkConfig]] of this TableEnvironment. */
+  private[flink] def getFrameworkConfig: FrameworkConfig = {
+    frameworkConfig
+  }
+
+  protected def validateType(typeInfo: TypeInformation[_]): Unit = {
+    val clazz = typeInfo.getTypeClass
+    if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+        !Modifier.isPublic(clazz.getModifiers) ||
+        clazz.getCanonicalName == null) {
+      throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " +
+        s"static and globally accessible.")
+    }
+  }
+
+  /**
+    * Returns field names and field positions for a given [[TypeInformation]].
+    *
+    * Field names are automatically extracted for
+    * [[org.apache.flink.api.common.typeutils.CompositeType]].
+    * The method fails if inputType is not a
+    * [[org.apache.flink.api.common.typeutils.CompositeType]].
+    *
+    * @param inputType The TypeInformation extract the field names and positions from.
+    * @tparam A The type of the TypeInformation.
+    * @return A tuple of two arrays holding the field names and corresponding field positions.
+    */
+  protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
+      (Array[String], Array[Int]) =
+  {
+    validateType(inputType)
+
+    val fieldNames: Array[String] = inputType match {
+      case t: TupleTypeInfo[A] => t.getFieldNames
+      case c: CaseClassTypeInfo[A] => c.getFieldNames
+      case p: PojoTypeInfo[A] => p.getFieldNames
+      case r: RowTypeInfo => r.getFieldNames
+      case tpe =>
+        throw new TableException(s"Type $tpe lacks explicit field naming")
+    }
+    val fieldIndexes = fieldNames.indices.toArray
+
+    if (fieldNames.contains("*")) {
+      throw new TableException("Field name can not be '*'.")
+    }
+
+    (fieldNames, fieldIndexes)
+  }
+
+  /**
+    * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
+    * [[Expression]].
+    *
+    * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
+    * @param exprs The expressions that define the field names.
+    * @tparam A The type of the TypeInformation.
+    * @return A tuple of two arrays holding the field names and corresponding field positions.
+    */
+  protected[flink] def getFieldInfo[A](
+    inputType: TypeInformation[A],
+    exprs: Array[Expression]): (Array[String], Array[Int]) = {
+
+    validateType(inputType)
+
+    val indexedNames: Array[(Int, String)] = inputType match {
+      case a: AtomicType[A] =>
+        if (exprs.length != 1) {
+          throw new TableException("Table of atomic type can only have a single field.")
+        }
+        exprs.map {
+          case UnresolvedFieldReference(name) => (0, name)
+          case _ => throw new TableException("Field reference expression expected.")
+        }
+      case t: TupleTypeInfo[A] =>
+        exprs.zipWithIndex.map {
+          case (UnresolvedFieldReference(name), idx) => (idx, name)
+          case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
+            val idx = t.getFieldIndex(origName)
+            if (idx < 0) {
+              throw new TableException(s"$origName is not a field of type $t")
+            }
+            (idx, name)
+          case _ => throw new TableException(
+            "Field reference expression or alias on field expression expected.")
+        }
+      case c: CaseClassTypeInfo[A] =>
+        exprs.zipWithIndex.map {
+          case (UnresolvedFieldReference(name), idx) => (idx, name)
+          case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
+            val idx = c.getFieldIndex(origName)
+            if (idx < 0) {
+              throw new TableException(s"$origName is not a field of type $c")
+            }
+            (idx, name)
+          case _ => throw new TableException(
+            "Field reference expression or alias on field expression expected.")
+        }
+      case p: PojoTypeInfo[A] =>
+        exprs.map {
+          case (UnresolvedFieldReference(name)) =>
+            val idx = p.getFieldIndex(name)
+            if (idx < 0) {
+              throw new TableException(s"$name is not a field of type $p")
+            }
+            (idx, name)
+          case Alias(UnresolvedFieldReference(origName), name, _) =>
+            val idx = p.getFieldIndex(origName)
+            if (idx < 0) {
+              throw new TableException(s"$origName is not a field of type $p")
+            }
+            (idx, name)
+          case _ => throw new TableException(
+            "Field reference expression or alias on field expression expected.")
+        }
+      case tpe => throw new TableException(
+        s"Source of type $tpe cannot be converted into Table.")
+    }
+
+    val (fieldIndexes, fieldNames) = indexedNames.unzip
+
+    if (fieldNames.contains("*")) {
+      throw new TableException("Field name can not be '*'.")
+    }
+
+    (fieldNames.toArray, fieldIndexes.toArray)
+  }
+
+}
+
+/**
+  * Object to instantiate a [[TableEnvironment]] depending on the batch or stream execution
+  * environment.
+  */
+object TableEnvironment {
+
+  /**
+    * Returns a [[JavaBatchTableEnv]] for a Java [[JavaBatchExecEnv]].
+    *
+    * @param executionEnvironment The Java batch ExecutionEnvironment.
+    */
+  def getTableEnvironment(executionEnvironment: JavaBatchExecEnv): JavaBatchTableEnv = {
+    new JavaBatchTableEnv(executionEnvironment, new TableConfig())
+  }
+
+  /**
+    * Returns a [[JavaBatchTableEnv]] for a Java [[JavaBatchExecEnv]] and a given [[TableConfig]].
+    *
+    * @param executionEnvironment The Java batch ExecutionEnvironment.
+    * @param tableConfig The TableConfig for the new TableEnvironment.
+    */
+  def getTableEnvironment(
+    executionEnvironment: JavaBatchExecEnv,
+    tableConfig: TableConfig): JavaBatchTableEnv = {
+
+    new JavaBatchTableEnv(executionEnvironment, tableConfig)
+  }
+
+  /**
+    * Returns a [[ScalaBatchTableEnv]] for a Scala [[ScalaBatchExecEnv]].
+    *
+    * @param executionEnvironment The Scala batch ExecutionEnvironment.
+    */
+  def getTableEnvironment(executionEnvironment: ScalaBatchExecEnv): ScalaBatchTableEnv = {
+    new ScalaBatchTableEnv(executionEnvironment, new TableConfig())
+  }
+
+  /**
+    * Returns a [[ScalaBatchTableEnv]] for a Scala [[ScalaBatchExecEnv]] and a given
+    * [[TableConfig]].
+    *
+    * @param executionEnvironment The Scala batch ExecutionEnvironment.
+    * @param tableConfig The TableConfig for the new TableEnvironment.
+    */
+  def getTableEnvironment(
+    executionEnvironment: ScalaBatchExecEnv,
+    tableConfig: TableConfig): ScalaBatchTableEnv = {
+
+    new ScalaBatchTableEnv(executionEnvironment, tableConfig)
+  }
+
+  /**
+    * Returns a [[JavaStreamTableEnv]] for a Java [[JavaStreamExecEnv]].
+    *
+    * @param executionEnvironment The Java StreamExecutionEnvironment.
+    */
+  def getTableEnvironment(executionEnvironment: JavaStreamExecEnv): JavaStreamTableEnv = {
+    new JavaStreamTableEnv(executionEnvironment, new TableConfig())
+  }
+
+  /**
+    * Returns a [[JavaStreamTableEnv]] for a Java [[JavaStreamExecEnv]] and a given [[TableConfig]].
+    *
+    * @param executionEnvironment The Java StreamExecutionEnvironment.
+    * @param tableConfig The TableConfig for the new TableEnvironment.
+    */
+  def getTableEnvironment(
+    executionEnvironment: JavaStreamExecEnv,
+    tableConfig: TableConfig): JavaStreamTableEnv = {
+
+    new JavaStreamTableEnv(executionEnvironment, tableConfig)
+  }
+
+  /**
+    * Returns a [[ScalaStreamTableEnv]] for a Scala stream [[ScalaStreamExecEnv]].
+    *
+    * @param executionEnvironment The Scala StreamExecutionEnvironment.
+    */
+  def getTableEnvironment(executionEnvironment: ScalaStreamExecEnv): ScalaStreamTableEnv = {
+    new ScalaStreamTableEnv(executionEnvironment, new TableConfig())
+  }
+
+  /**
+    * Returns a [[ScalaStreamTableEnv]] for a Scala stream [[ScalaStreamExecEnv]].
+    *
+    * @param executionEnvironment The Scala StreamExecutionEnvironment.
+    * @param tableConfig The TableConfig for the new TableEnvironment.
+    */
+  def getTableEnvironment(
+    executionEnvironment: ScalaStreamExecEnv,
+    tableConfig: TableConfig): ScalaStreamTableEnv = {
+
+    new ScalaStreamTableEnv(executionEnvironment, tableConfig)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
new file mode 100644
index 0000000..939cb67
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+
+/**
+  * This class enumerates all supported types of the Table API.
+  */
+object Types {
+
+  val STRING = BasicTypeInfo.STRING_TYPE_INFO
+  val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO
+
+  val BYTE = BasicTypeInfo.BYTE_TYPE_INFO
+  val SHORT = BasicTypeInfo.SHORT_TYPE_INFO
+  val INT = BasicTypeInfo.INT_TYPE_INFO
+  val LONG = BasicTypeInfo.LONG_TYPE_INFO
+  val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO
+  val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO
+  val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO
+
+  val DATE = SqlTimeTypeInfo.DATE
+  val TIME = SqlTimeTypeInfo.TIME
+  val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
+  val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
+  val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
new file mode 100644
index 0000000..7ca45a6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+/**
+  * Exception for all errors occurring during expression parsing.
+  */
+case class ExpressionParserException(msg: String) extends RuntimeException(msg)
+
+/**
+  * Exception for all errors occurring during sql parsing.
+  */
+case class SqlParserException(
+    msg: String,
+    cause: Throwable)
+  extends RuntimeException(msg, cause) {
+
+  def this(msg: String) = this(msg, null)
+
+}
+
+/**
+  * General Exception for all errors during table handling.
+  */
+case class TableException(
+    msg: String,
+    cause: Throwable)
+  extends RuntimeException(msg, cause) {
+
+  def this(msg: String) = this(msg, null)
+
+}
+
+object TableException {
+  def apply(msg: String): TableException = new TableException(msg)
+}
+
+/**
+  * Exception for all errors occurring during validation phase.
+  */
+case class ValidationException(
+    msg: String,
+    cause: Throwable)
+  extends RuntimeException(msg, cause) {
+
+  def this(msg: String) = this(msg, null)
+
+}
+
+object ValidationException {
+  def apply(msg: String): ValidationException = new ValidationException(msg)
+}
+
+/**
+  * Exception for unwanted method calling on unresolved expression.
+  */
+case class UnresolvedException(msg: String) extends RuntimeException(msg)

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
new file mode 100644
index 0000000..15e842e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.java
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.table.expressions.ExpressionParser
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.TableFunction
+
+/**
+  * The [[TableEnvironment]] for a Java batch [[DataSet]]
+  * [[ExecutionEnvironment]].
+  *
+  * A TableEnvironment can be used to:
+  * - convert a [[DataSet]] to a [[Table]]
+  * - register a [[DataSet]] in the [[TableEnvironment]]'s catalog
+  * - register a [[Table]] in the [[TableEnvironment]]'s catalog
+  * - scan a registered table to obtain a [[Table]]
+  * - specify a SQL query on registered tables to obtain a [[Table]]
+  * - convert a [[Table]] into a [[DataSet]]
+  * - explain the AST and execution plan of a [[Table]]
+  *
+  * @param execEnv The Java batch [[ExecutionEnvironment]] of the TableEnvironment.
+  * @param config The configuration of the TableEnvironment.
+  */
+class BatchTableEnvironment(
+    execEnv: ExecutionEnvironment,
+    config: TableConfig)
+  extends org.apache.flink.table.api.BatchTableEnvironment(execEnv, config) {
+
+  /**
+    * Converts the given [[DataSet]] into a [[Table]].
+    *
+    * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
+    *
+    * @param dataSet The [[DataSet]] to be converted.
+    * @tparam T The type of the [[DataSet]].
+    * @return The converted [[Table]].
+    */
+  def fromDataSet[T](dataSet: DataSet[T]): Table = {
+
+    val name = createUniqueTableName()
+    registerDataSetInternal(name, dataSet)
+    scan(name)
+  }
+
+  /**
+    * Converts the given [[DataSet]] into a [[Table]] with specified field names.
+    *
+    * Example:
+    *
+    * {{{
+    *   DataSet<Tuple2<String, Long>> set = ...
+    *   Table tab = tableEnv.fromDataSet(set, "a, b")
+    * }}}
+    *
+    * @param dataSet The [[DataSet]] to be converted.
+    * @param fields The field names of the resulting [[Table]].
+    * @tparam T The type of the [[DataSet]].
+    * @return The converted [[Table]].
+    */
+  def fromDataSet[T](dataSet: DataSet[T], fields: String): Table = {
+    val exprs = ExpressionParser
+      .parseExpressionList(fields)
+      .toArray
+
+    val name = createUniqueTableName()
+    registerDataSetInternal(name, dataSet, exprs)
+    scan(name)
+  }
+
+  /**
+    * Registers the given [[DataSet]] as table in the
+    * [[TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
+    *
+    * @param name The name under which the [[DataSet]] is registered in the catalog.
+    * @param dataSet The [[DataSet]] to register.
+    * @tparam T The type of the [[DataSet]] to register.
+    */
+  def registerDataSet[T](name: String, dataSet: DataSet[T]): Unit = {
+
+    checkValidTableName(name)
+    registerDataSetInternal(name, dataSet)
+  }
+
+  /**
+    * Registers the given [[DataSet]] as table with specified field names in the
+    * [[TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * Example:
+    *
+    * {{{
+    *   DataSet<Tuple2<String, Long>> set = ...
+    *   tableEnv.registerDataSet("myTable", set, "a, b")
+    * }}}
+    *
+    * @param name The name under which the [[DataSet]] is registered in the catalog.
+    * @param dataSet The [[DataSet]] to register.
+    * @param fields The field names of the registered table.
+    * @tparam T The type of the [[DataSet]] to register.
+    */
+  def registerDataSet[T](name: String, dataSet: DataSet[T], fields: String): Unit = {
+    val exprs = ExpressionParser
+      .parseExpressionList(fields)
+      .toArray
+
+    checkValidTableName(name)
+    registerDataSetInternal(name, dataSet, exprs)
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataSet]] of a specified type.
+    *
+    * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
+    * - [[Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param clazz The class of the type of the resulting [[DataSet]].
+    * @tparam T The type of the resulting [[DataSet]].
+    * @return The converted [[DataSet]].
+    */
+  def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = {
+    translate[T](table)(TypeExtractor.createTypeInfo(clazz))
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataSet]] of a specified type.
+    *
+    * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
+    * - [[Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param typeInfo The [[TypeInformation]] that specifies the type of the resulting [[DataSet]].
+    * @tparam T The type of the resulting [[DataSet]].
+    * @return The converted [[DataSet]].
+    */
+  def toDataSet[T](table: Table, typeInfo: TypeInformation[T]): DataSet[T] = {
+    translate[T](table)(typeInfo)
+  }
+
+  /**
+    * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
+    * Registered functions can be referenced in Table API and SQL queries.
+    *
+    * @param name The name under which the function is registered.
+    * @param tf The TableFunction to register.
+    * @tparam T The type of the output row.
+    */
+  def registerFunction[T](name: String, tf: TableFunction[T]): Unit = {
+    implicit val typeInfo: TypeInformation[T] = TypeExtractor
+      .createTypeInfo(tf, classOf[TableFunction[_]], tf.getClass, 0)
+      .asInstanceOf[TypeInformation[T]]
+
+    registerTableFunctionInternal[T](name, tf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
new file mode 100644
index 0000000..428dcae
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.java
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.table.expressions.ExpressionParser
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+
+/**
+  * The [[TableEnvironment]] for a Java [[StreamExecutionEnvironment]].
+  *
+  * A TableEnvironment can be used to:
+  * - convert a [[DataStream]] to a [[Table]]
+  * - register a [[DataStream]] in the [[TableEnvironment]]'s catalog
+  * - register a [[Table]] in the [[TableEnvironment]]'s catalog
+  * - scan a registered table to obtain a [[Table]]
+  * - specify a SQL query on registered tables to obtain a [[Table]]
+  * - convert a [[Table]] into a [[DataStream]]
+  * - explain the AST and execution plan of a [[Table]]
+  *
+  * @param execEnv The Java [[StreamExecutionEnvironment]] of the TableEnvironment.
+  * @param config The configuration of the TableEnvironment.
+  */
+class StreamTableEnvironment(
+    execEnv: StreamExecutionEnvironment,
+    config: TableConfig)
+  extends org.apache.flink.table.api.StreamTableEnvironment(execEnv, config) {
+
+  /**
+    * Converts the given [[DataStream]] into a [[Table]].
+    *
+    * The field names of the [[Table]] are automatically derived from the type of the
+    * [[DataStream]].
+    *
+    * @param dataStream The [[DataStream]] to be converted.
+    * @tparam T The type of the [[DataStream]].
+    * @return The converted [[Table]].
+    */
+  def fromDataStream[T](dataStream: DataStream[T]): Table = {
+
+    val name = createUniqueTableName()
+    registerDataStreamInternal(name, dataStream)
+    ingest(name)
+  }
+
+  /**
+    * Converts the given [[DataStream]] into a [[Table]] with specified field names.
+    *
+    * Example:
+    *
+    * {{{
+    *   DataStream<Tuple2<String, Long>> stream = ...
+    *   Table tab = tableEnv.fromDataStream(stream, "a, b")
+    * }}}
+    *
+    * @param dataStream The [[DataStream]] to be converted.
+    * @param fields The field names of the resulting [[Table]].
+    * @tparam T The type of the [[DataStream]].
+    * @return The converted [[Table]].
+    */
+  def fromDataStream[T](dataStream: DataStream[T], fields: String): Table = {
+    val exprs = ExpressionParser
+      .parseExpressionList(fields)
+      .toArray
+
+    val name = createUniqueTableName()
+    registerDataStreamInternal(name, dataStream, exprs)
+    ingest(name)
+  }
+
+  /**
+    * Registers the given [[DataStream]] as table in the
+    * [[TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * The field names of the [[Table]] are automatically derived
+    * from the type of the [[DataStream]].
+    *
+    * @param name The name under which the [[DataStream]] is registered in the catalog.
+    * @param dataStream The [[DataStream]] to register.
+    * @tparam T The type of the [[DataStream]] to register.
+    */
+  def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit = {
+
+    checkValidTableName(name)
+    registerDataStreamInternal(name, dataStream)
+  }
+
+  /**
+    * Registers the given [[DataStream]] as table with specified field names in the
+    * [[TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * Example:
+    *
+    * {{{
+    *   DataStream<Tuple2<String, Long>> set = ...
+    *   tableEnv.registerDataStream("myTable", set, "a, b")
+    * }}}
+    *
+    * @param name The name under which the [[DataStream]] is registered in the catalog.
+    * @param dataStream The [[DataStream]] to register.
+    * @param fields The field names of the registered table.
+    * @tparam T The type of the [[DataStream]] to register.
+    */
+  def registerDataStream[T](name: String, dataStream: DataStream[T], fields: String): Unit = {
+    val exprs = ExpressionParser
+      .parseExpressionList(fields)
+      .toArray
+
+    checkValidTableName(name)
+    registerDataStreamInternal(name, dataStream, exprs)
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataStream]] of a specified type.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param clazz The class of the type of the resulting [[DataStream]].
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
+    translate[T](table)(TypeExtractor.createTypeInfo(clazz))
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataStream]] of a specified type.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
+    translate[T](table)(typeInfo)
+  }
+
+  /**
+    * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
+    * Registered functions can be referenced in Table API and SQL queries.
+    *
+    * @param name The name under which the function is registered.
+    * @param tf The TableFunction to register.
+    * @tparam T The type of the output row.
+    */
+  def registerFunction[T](name: String, tf: TableFunction[T]): Unit = {
+    implicit val typeInfo: TypeInformation[T] = TypeExtractor
+      .createTypeInfo(tf, classOf[TableFunction[_]], tf.getClass, 0)
+      .asInstanceOf[TypeInformation[T]]
+
+    registerTableFunctionInternal[T](name, tf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/groupWindows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/groupWindows.scala
new file mode 100644
index 0000000..9c82e9b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/groupWindows.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java
+
+import org.apache.flink.table.api.{SessionWindow, SlideWithSize, TumblingWindow}
+
+/**
+  * Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping
+  * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
+  * elements in 5 minutes intervals.
+  */
+object Tumble {
+
+  /**
+    * Creates a tumbling window. Tumbling windows are consecutive, non-overlapping
+    * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
+    * elements in 5 minutes intervals.
+    *
+    * @param size the size of the window as time or row-count interval.
+    * @return a tumbling window
+    */
+  def over(size: String): TumblingWindow = new TumblingWindow(size)
+}
+
+/**
+  * Helper class for creating a sliding window. Sliding windows have a fixed size and slide by
+  * a specified slide interval. If the slide interval is smaller than the window size, sliding
+  * windows are overlapping. Thus, an element can be assigned to multiple windows.
+  *
+  * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements
+  * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive
+  * window evaluations.
+  */
+object Slide {
+
+  /**
+    * Creates a sliding window. Sliding windows have a fixed size and slide by
+    * a specified slide interval. If the slide interval is smaller than the window size, sliding
+    * windows are overlapping. Thus, an element can be assigned to multiple windows.
+    *
+    * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups
+    * elements of 15 minutes and evaluates every five minutes. Each element is contained in three
+    * consecutive window evaluations.
+    *
+    * @param size the size of the window as time or row-count interval
+    * @return a partially specified sliding window
+    */
+  def over(size: String): SlideWithSize = new SlideWithSize(size)
+}
+
+/**
+  * Helper class for creating a session window. The boundary of session windows are defined by
+  * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
+  * gap period.
+  */
+object Session {
+
+  /**
+    * Creates a session window. The boundary of session windows are defined by
+    * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
+    * gap period.
+    *
+    * @param gap specifies how long (as interval of milliseconds) to wait for new data before
+    *            closing the session window.
+    * @return a session window
+    */
+  def withGap(gap: String): SessionWindow = new SessionWindow(gap)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/package.scala
new file mode 100644
index 0000000..e16e1a8
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/package.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api
+
+/**
+ * == Table API ==
+ *
+ * This package contains the API of the Table API. It can be used with Flink Streaming
+ * and Flink Batch. From Scala as well as from Java.
+ *
+ * When using the Table API, as user creates a [[org.apache.flink.table.api.Table]] from
+ * a DataSet or DataStream. On this relational operations can be performed. A table can also
+ * be converted back to a DataSet or DataStream.
+ *
+ * Packages [[org.apache.flink.table.api.scala]] and [[org.apache.flink.table.api.java]] contain
+ * the language specific part of the API. Refer to these packages for documentation on how
+ * the Table API can be used in Java and Scala.
+ */
+package object api

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
new file mode 100644
index 0000000..3ae8c31
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api._
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.functions.TableFunction
+
+import _root_.scala.reflect.ClassTag
+
+/**
+  * The [[TableEnvironment]] for a Scala batch [[DataSet]]
+  * [[ExecutionEnvironment]].
+  *
+  * A TableEnvironment can be used to:
+  * - convert a [[DataSet]] to a [[Table]]
+  * - register a [[DataSet]] in the [[TableEnvironment]]'s catalog
+  * - register a [[Table]] in the [[TableEnvironment]]'s catalog
+  * - scan a registered table to obtain a [[Table]]
+  * - specify a SQL query on registered tables to obtain a [[Table]]
+  * - convert a [[Table]] into a [[DataSet]]
+  * - explain the AST and execution plan of a [[Table]]
+  *
+  * @param execEnv The Scala batch [[ExecutionEnvironment]] of the TableEnvironment.
+  * @param config The configuration of the TableEnvironment.
+  */
+class BatchTableEnvironment(
+    execEnv: ExecutionEnvironment,
+    config: TableConfig)
+  extends org.apache.flink.table.api.BatchTableEnvironment(execEnv.getJavaEnv, config) {
+
+  /**
+    * Converts the given [[DataSet]] into a [[Table]].
+    *
+    * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
+    *
+    * @param dataSet The [[DataSet]] to be converted.
+    * @tparam T The type of the [[DataSet]].
+    * @return The converted [[Table]].
+    */
+  def fromDataSet[T](dataSet: DataSet[T]): Table = {
+
+    val name = createUniqueTableName()
+    registerDataSetInternal(name, dataSet.javaSet)
+    scan(name)
+  }
+
+  /**
+    * Converts the given [[DataSet]] into a [[Table]] with specified field names.
+    *
+    * Example:
+    *
+    * {{{
+    *   val set: DataSet[(String, Long)] = ...
+    *   val tab: Table = tableEnv.fromDataSet(set, 'a, 'b)
+    * }}}
+    *
+    * @param dataSet The [[DataSet]] to be converted.
+    * @param fields The field names of the resulting [[Table]].
+    * @tparam T The type of the [[DataSet]].
+    * @return The converted [[Table]].
+    */
+  def fromDataSet[T](dataSet: DataSet[T], fields: Expression*): Table = {
+
+    val name = createUniqueTableName()
+    registerDataSetInternal(name, dataSet.javaSet, fields.toArray)
+    scan(name)
+  }
+
+  /**
+    * Registers the given [[DataSet]] as table in the
+    * [[TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
+    *
+    * @param name The name under which the [[DataSet]] is registered in the catalog.
+    * @param dataSet The [[DataSet]] to register.
+    * @tparam T The type of the [[DataSet]] to register.
+    */
+  def registerDataSet[T](name: String, dataSet: DataSet[T]): Unit = {
+
+    checkValidTableName(name)
+    registerDataSetInternal(name, dataSet.javaSet)
+  }
+
+  /**
+    * Registers the given [[DataSet]] as table with specified field names in the
+    * [[TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * Example:
+    *
+    * {{{
+    *   val set: DataSet[(String, Long)] = ...
+    *   tableEnv.registerDataSet("myTable", set, 'a, 'b)
+    * }}}
+    *
+    * @param name The name under which the [[DataSet]] is registered in the catalog.
+    * @param dataSet The [[DataSet]] to register.
+    * @param fields The field names of the registered table.
+    * @tparam T The type of the [[DataSet]] to register.
+    */
+  def registerDataSet[T](name: String, dataSet: DataSet[T], fields: Expression*): Unit = {
+
+    checkValidTableName(name)
+    registerDataSetInternal(name, dataSet.javaSet, fields.toArray)
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataSet]] of a specified type.
+    *
+    * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @tparam T The type of the resulting [[DataSet]].
+    * @return The converted [[DataSet]].
+    */
+  def toDataSet[T: TypeInformation](table: Table): DataSet[T] = {
+    wrap[T](translate(table))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
+  }
+
+  /**
+    * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
+    * Registered functions can be referenced in Table API and SQL queries.
+    *
+    * @param name The name under which the function is registered.
+    * @param tf The TableFunction to register.
+    * @tparam T The type of the output row.
+    */
+  def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
+    registerTableFunctionInternal(name, tf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala
new file mode 100644
index 0000000..4b92bdb
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.expressions.Expression
+
+/**
+  * Holds methods to convert a [[DataSet]] into a [[Table]].
+  *
+  * @param dataSet The [[DataSet]] to convert.
+  * @param inputType The [[TypeInformation]] for the type of the [[DataSet]].
+  * @tparam T The type of the [[DataSet]].
+  */
+class DataSetConversions[T](dataSet: DataSet[T], inputType: TypeInformation[T]) {
+
+  /**
+    * Converts the [[DataSet]] into a [[Table]].
+    *
+    * The field name of the new [[Table]] can be specified like this:
+    *
+    * {{{
+    *   val env = ExecutionEnvironment.getExecutionEnvironment
+    *   val tEnv = TableEnvironment.getTableEnvironment(env)
+    *
+    *   val set: DataSet[(String, Int)] = ...
+    *   val table = set.toTable(tEnv, 'name, 'amount)
+    * }}}
+    *
+    * If not explicitly specified, field names are automatically extracted from the type of
+    * the [[DataSet]].
+    *
+    * @param tableEnv The [[BatchTableEnvironment]] in which the new [[Table]] is created.
+    * @param fields The field names of the new [[Table]] (optional).
+    * @return The resulting [[Table]].
+    */
+  def toTable(tableEnv: BatchTableEnvironment, fields: Expression*): Table = {
+    if (fields.isEmpty) {
+      tableEnv.fromDataSet(dataSet)
+    } else {
+      tableEnv.fromDataSet(dataSet, fields: _*)
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
new file mode 100644
index 0000000..89f7627
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.streaming.api.scala.DataStream
+
+/**
+  * Holds methods to convert a [[DataStream]] into a [[Table]].
+  *
+  * @param dataStream The [[DataStream]] to convert.
+  * @param inputType The [[TypeInformation]] for the type of the [[DataStream]].
+  * @tparam T The type of the [[DataStream]].
+  */
+class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInformation[T]) {
+
+  /**
+    * Converts the [[DataStream]] into a [[Table]].
+    *
+    * The field name of the new [[Table]] can be specified like this:
+    *
+    * {{{
+    *   val env = StreamExecutionEnvironment.getExecutionEnvironment
+    *   val tEnv = TableEnvironment.getTableEnvironment(env)
+    *
+    *   val stream: DataStream[(String, Int)] = ...
+    *   val table = stream.toTable(tEnv, 'name, 'amount)
+    * }}}
+    *
+    * If not explicitly specified, field names are automatically extracted from the type of
+    * the [[DataStream]].
+    *
+    * @param tableEnv The [[StreamTableEnvironment]] in which the new [[Table]] is created.
+    * @param fields The field names of the new [[Table]] (optional).
+    * @return The resulting [[Table]].
+    */
+  def toTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table = {
+    if (fields.isEmpty) {
+      tableEnv.fromDataStream(dataStream)
+    } else {
+      tableEnv.fromDataStream(dataStream, fields:_*)
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
new file mode 100644
index 0000000..1e6749e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableEnvironment, Table, TableConfig}
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.scala.asScalaStream
+
+/**
+  * The [[TableEnvironment]] for a Scala [[StreamExecutionEnvironment]].
+  *
+  * A TableEnvironment can be used to:
+  * - convert a [[DataStream]] to a [[Table]]
+  * - register a [[DataStream]] in the [[TableEnvironment]]'s catalog
+  * - register a [[Table]] in the [[TableEnvironment]]'s catalog
+  * - scan a registered table to obtain a [[Table]]
+  * - specify a SQL query on registered tables to obtain a [[Table]]
+  * - convert a [[Table]] into a [[DataStream]]
+  * - explain the AST and execution plan of a [[Table]]
+  *
+  * @param execEnv The Scala [[StreamExecutionEnvironment]] of the TableEnvironment.
+  * @param config The configuration of the TableEnvironment.
+  */
+class StreamTableEnvironment(
+    execEnv: StreamExecutionEnvironment,
+    config: TableConfig)
+  extends org.apache.flink.table.api.StreamTableEnvironment(
+    execEnv.getWrappedStreamExecutionEnvironment,
+    config) {
+
+  /**
+    * Converts the given [[DataStream]] into a [[Table]].
+    *
+    * The field names of the [[Table]] are automatically derived from the type of the
+    * [[DataStream]].
+    *
+    * @param dataStream The [[DataStream]] to be converted.
+    * @tparam T The type of the [[DataStream]].
+    * @return The converted [[Table]].
+    */
+  def fromDataStream[T](dataStream: DataStream[T]): Table = {
+
+    val name = createUniqueTableName()
+    registerDataStreamInternal(name, dataStream.javaStream)
+    ingest(name)
+  }
+
+  /**
+    * Converts the given [[DataStream]] into a [[Table]] with specified field names.
+    *
+    * Example:
+    *
+    * {{{
+    *   val stream: DataStream[(String, Long)] = ...
+    *   val tab: Table = tableEnv.fromDataStream(stream, 'a, 'b)
+    * }}}
+    *
+    * @param dataStream The [[DataStream]] to be converted.
+    * @param fields The field names of the resulting [[Table]].
+    * @tparam T The type of the [[DataStream]].
+    * @return The converted [[Table]].
+    */
+  def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table = {
+
+    val name = createUniqueTableName()
+    registerDataStreamInternal(name, dataStream.javaStream, fields.toArray)
+    ingest(name)
+  }
+
+  /**
+    * Registers the given [[DataStream]] as table in the
+    * [[TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * The field names of the [[Table]] are automatically derived
+    * from the type of the [[DataStream]].
+    *
+    * @param name The name under which the [[DataStream]] is registered in the catalog.
+    * @param dataStream The [[DataStream]] to register.
+    * @tparam T The type of the [[DataStream]] to register.
+    */
+  def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit = {
+
+    checkValidTableName(name)
+    registerDataStreamInternal(name, dataStream.javaStream)
+  }
+
+  /**
+    * Registers the given [[DataStream]] as table with specified field names in the
+    * [[TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * Example:
+    *
+    * {{{
+    *   val set: DataStream[(String, Long)] = ...
+    *   tableEnv.registerDataStream("myTable", set, 'a, 'b)
+    * }}}
+    *
+    * @param name The name under which the [[DataStream]] is registered in the catalog.
+    * @param dataStream The [[DataStream]] to register.
+    * @param fields The field names of the registered table.
+    * @tparam T The type of the [[DataStream]] to register.
+    */
+  def registerDataStream[T](name: String, dataStream: DataStream[T], fields: Expression*): Unit = {
+
+    checkValidTableName(name)
+    registerDataStreamInternal(name, dataStream.javaStream, fields.toArray)
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataStream]] of a specified type.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * types: Fields are mapped by position, field types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toDataStream[T: TypeInformation](table: Table): DataStream[T] = {
+    asScalaStream(translate(table))
+  }
+
+  /**
+    * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
+    * Registered functions can be referenced in SQL queries.
+    *
+    * @param name The name under which the function is registered.
+    * @param tf The TableFunction to register
+    */
+  def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
+    registerTableFunctionInternal(name, tf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
new file mode 100644
index 0000000..2a0d571
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.{Table, TableException}
+import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv}
+import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnv}
+
+/**
+  * Holds methods to convert a [[Table]] into a [[DataSet]] or a [[DataStream]].
+  *
+  * @param table The table to convert.
+  */
+class TableConversions(table: Table) {
+
+  /** Converts the [[Table]] to a [[DataSet]] of the specified type. */
+  def toDataSet[T: TypeInformation]: DataSet[T] = {
+
+    table.tableEnv match {
+      case tEnv: ScalaBatchTableEnv =>
+        tEnv.toDataSet(table)
+      case _ =>
+        throw new TableException(
+          "Only tables that originate from Scala DataSets can be converted to Scala DataSets.")
+    }
+  }
+
+  /** Converts the [[Table]] to a [[DataStream]] of the specified type. */
+  def toDataStream[T: TypeInformation]: DataStream[T] = {
+
+    table.tableEnv match {
+      case tEnv: ScalaStreamTableEnv =>
+        tEnv.toDataStream(table)
+      case _ =>
+        throw new TableException(
+          "Only tables that originate from Scala DataStreams " +
+            "can be converted to Scala DataStreams.")
+    }
+  }
+
+}
+