You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:57 UTC
[28/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
new file mode 100644
index 0000000..4e43001
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import _root_.java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.sql2rel.RelDecorrelator
+import org.apache.calcite.tools.{Programs, RuleSet}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.calcite.FlinkPlannerImpl
+import org.apache.flink.table.explain.PlanJsonParser
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
+import org.apache.flink.table.plan.rules.FlinkRuleSets
+import org.apache.flink.table.plan.schema.{DataStreamTable, TableSourceTable}
+import org.apache.flink.table.sinks.{StreamTableSink, TableSink}
+import org.apache.flink.table.sources.StreamTableSource
+import org.apache.flink.types.Row
+
+/**
+ * The base class for stream TableEnvironments.
+ *
+ * A TableEnvironment can be used to:
+ * - convert [[DataStream]] to a [[Table]]
+ * - register a [[DataStream]] as a table in the catalog
+ * - register a [[Table]] in the catalog
+ * - scan a registered table to obtain a [[Table]]
+ * - specify a SQL query on registered tables to obtain a [[Table]]
+ * - convert a [[Table]] into a [[DataStream]]
+ *
+ * @param execEnv The [[StreamExecutionEnvironment]] which is wrapped in this
+ * [[StreamTableEnvironment]].
+ * @param config The [[TableConfig]] of this [[StreamTableEnvironment]].
+ */
+abstract class StreamTableEnvironment(
+ private[flink] val execEnv: StreamExecutionEnvironment,
+ config: TableConfig)
+ extends TableEnvironment(config) {
+
+ // a counter for unique table names
+ private val nameCntr: AtomicInteger = new AtomicInteger(0)
+
+ // the naming pattern for internally registered tables.
+ private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r
+
+ /**
+ * Checks if the chosen table name is valid.
+ *
+ * @param name The table name to check.
+ */
+ override protected def checkValidTableName(name: String): Unit = {
+ val m = internalNamePattern.findFirstIn(name)
+ m match {
+ case Some(_) =>
+ throw new TableException(s"Illegal Table name. " +
+ s"Please choose a name that does not contain the pattern $internalNamePattern")
+ case None =>
+ }
+ }
+
+ /** Returns a unique table name according to the internal naming pattern. */
+ protected def createUniqueTableName(): String = "_DataStreamTable_" + nameCntr.getAndIncrement()
+
+ /**
+ * Returns field names and field positions for a given [[TypeInformation]].
+ *
+ * Field names are automatically extracted for
+ * [[org.apache.flink.api.common.typeutils.CompositeType]].
+ * The method fails if inputType is not a
+ * [[org.apache.flink.api.common.typeutils.CompositeType]].
+ *
+ * @param inputType The TypeInformation extract the field names and positions from.
+ * @tparam A The type of the TypeInformation.
+ * @return A tuple of two arrays holding the field names and corresponding field positions.
+ */
+ override protected[flink] def getFieldInfo[A](inputType: TypeInformation[A])
+ : (Array[String], Array[Int]) = {
+ val fieldInfo = super.getFieldInfo(inputType)
+ if (fieldInfo._1.contains("rowtime")) {
+ throw new TableException("'rowtime' ia a reserved field name in stream environment.")
+ }
+ fieldInfo
+ }
+
+ /**
+ * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
+ * [[Expression]].
+ *
+ * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
+ * @param exprs The expressions that define the field names.
+ * @tparam A The type of the TypeInformation.
+ * @return A tuple of two arrays holding the field names and corresponding field positions.
+ */
+ override protected[flink] def getFieldInfo[A](
+ inputType: TypeInformation[A],
+ exprs: Array[Expression])
+ : (Array[String], Array[Int]) = {
+ val fieldInfo = super.getFieldInfo(inputType, exprs)
+ if (fieldInfo._1.contains("rowtime")) {
+ throw new TableException("'rowtime' is a reserved field name in stream environment.")
+ }
+ fieldInfo
+ }
+
+ /**
+ * Ingests a registered table and returns the resulting [[Table]].
+ *
+ * The table to ingest must be registered in the [[TableEnvironment]]'s catalog.
+ *
+ * @param tableName The name of the table to ingest.
+ * @throws ValidationException if no table is registered under the given name.
+ * @return The ingested table.
+ */
+ @throws[ValidationException]
+ def ingest(tableName: String): Table = {
+
+ if (isRegistered(tableName)) {
+ new Table(this, CatalogNode(tableName, getRowType(tableName)))
+ }
+ else {
+ throw new ValidationException(s"Table \'$tableName\' was not found in the registry.")
+ }
+ }
+
+ /**
+ * Registers an external [[StreamTableSource]] in this [[TableEnvironment]]'s catalog.
+ * Registered tables can be referenced in SQL queries.
+ *
+ * @param name The name under which the [[StreamTableSource]] is registered.
+ * @param tableSource The [[org.apache.flink.table.sources.StreamTableSource]] to register.
+ */
+ def registerTableSource(name: String, tableSource: StreamTableSource[_]): Unit = {
+
+ checkValidTableName(name)
+ registerTableInternal(name, new TableSourceTable(tableSource))
+ }
+
+ /**
+ * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
+ *
+ * All tables referenced by the query must be registered in the TableEnvironment.
+ *
+ * @param query The SQL query to evaluate.
+ * @return The result of the query as Table.
+ */
+ override def sql(query: String): Table = {
+
+ val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
+ // parse the sql query
+ val parsed = planner.parse(query)
+ // validate the sql query
+ val validated = planner.validate(parsed)
+ // transform to a relational tree
+ val relational = planner.rel(validated)
+
+ new Table(this, LogicalRelNode(relational.rel))
+ }
+
+ /**
+ * Writes a [[Table]] to a [[TableSink]].
+ *
+ * Internally, the [[Table]] is translated into a [[DataStream]] and handed over to the
+ * [[TableSink]] to write it.
+ *
+ * @param table The [[Table]] to write.
+ * @param sink The [[TableSink]] to write the [[Table]] to.
+ * @tparam T The expected type of the [[DataStream]] which represents the [[Table]].
+ */
+ override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = {
+
+ sink match {
+ case streamSink: StreamTableSink[T] =>
+ val outputType = sink.getOutputType
+ // translate the Table into a DataStream and provide the type that the TableSink expects.
+ val result: DataStream[T] = translate(table)(outputType)
+ // Give the DataSet to the TableSink to emit it.
+ streamSink.emitDataStream(result)
+ case _ =>
+ throw new TableException("StreamTableSink required to emit streaming Table")
+ }
+ }
+
+ /**
+ * Registers a [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s
+ * catalog.
+ *
+ * @param name The name under which the table is registered in the catalog.
+ * @param dataStream The [[DataStream]] to register as table in the catalog.
+ * @tparam T the type of the [[DataStream]].
+ */
+ protected def registerDataStreamInternal[T](
+ name: String,
+ dataStream: DataStream[T]): Unit = {
+
+ val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType)
+ val dataStreamTable = new DataStreamTable[T](
+ dataStream,
+ fieldIndexes,
+ fieldNames
+ )
+ registerTableInternal(name, dataStreamTable)
+ }
+
+ /**
+ * Registers a [[DataStream]] as a table under a given name with field names as specified by
+ * field expressions in the [[TableEnvironment]]'s catalog.
+ *
+ * @param name The name under which the table is registered in the catalog.
+ * @param dataStream The [[DataStream]] to register as table in the catalog.
+ * @param fields The field expressions to define the field names of the table.
+ * @tparam T The type of the [[DataStream]].
+ */
+ protected def registerDataStreamInternal[T](
+ name: String,
+ dataStream: DataStream[T],
+ fields: Array[Expression]): Unit = {
+
+ val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields.toArray)
+ val dataStreamTable = new DataStreamTable[T](
+ dataStream,
+ fieldIndexes.toArray,
+ fieldNames.toArray
+ )
+ registerTableInternal(name, dataStreamTable)
+ }
+
+ /**
+ * Returns the built-in rules that are defined by the environment.
+ */
+ protected def getBuiltInRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_OPT_RULES
+
+ /**
+ * Generates the optimized [[RelNode]] tree from the original relational node tree.
+ *
+ * @param relNode The root node of the relational expression tree.
+ * @return The optimized [[RelNode]] tree
+ */
+ private[flink] def optimize(relNode: RelNode): RelNode = {
+ // decorrelate
+ val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
+
+ // optimize the logical Flink plan
+ val optProgram = Programs.ofRules(getRuleSet)
+ val flinkOutputProps = relNode.getTraitSet.replace(DataStreamConvention.INSTANCE).simplify()
+
+ val dataStreamPlan = try {
+ optProgram.run(getPlanner, decorPlan, flinkOutputProps)
+ }
+ catch {
+ case e: CannotPlanException =>
+ throw TableException(
+ s"Cannot generate a valid execution plan for the given query: \n\n" +
+ s"${RelOptUtil.toString(relNode)}\n" +
+ s"This exception indicates that the query uses an unsupported SQL feature.\n" +
+ s"Please check the documentation for the set of currently supported SQL features.", e)
+ }
+ dataStreamPlan
+ }
+
+
+ /**
+ * Translates a [[Table]] into a [[DataStream]].
+ *
+ * The transformation involves optimizing the relational expression tree as defined by
+ * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
+ *
+ * @param table The root node of the relational expression tree.
+ * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
+ * @tparam A The type of the resulting [[DataStream]].
+ * @return The [[DataStream]] that corresponds to the translated [[Table]].
+ */
+ protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
+ val dataStreamPlan = optimize(table.getRelNode)
+ translate(dataStreamPlan)
+ }
+
+ /**
+ * Translates a logical [[RelNode]] into a [[DataStream]].
+ *
+ * @param logicalPlan The root node of the relational expression tree.
+ * @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
+ * @tparam A The type of the resulting [[DataStream]].
+ * @return The [[DataStream]] that corresponds to the translated [[Table]].
+ */
+ protected def translate[A]
+ (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = {
+
+ validateType(tpe)
+
+ logicalPlan match {
+ case node: DataStreamRel =>
+ node.translateToPlan(
+ this,
+ Some(tpe.asInstanceOf[TypeInformation[Any]])
+ ).asInstanceOf[DataStream[A]]
+ case _ => ???
+ }
+ }
+
+ /**
+ * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
+ * the result of the given [[Table]].
+ *
+ * @param table The table for which the AST and execution plan will be returned.
+ */
+ def explain(table: Table): String = {
+ val ast = table.getRelNode
+ val optimizedPlan = optimize(ast)
+ val dataStream = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))
+
+ val env = dataStream.getExecutionEnvironment
+ val jsonSqlPlan = env.getExecutionPlan
+
+ val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false)
+
+ s"== Abstract Syntax Tree ==" +
+ System.lineSeparator +
+ s"${RelOptUtil.toString(ast)}" +
+ System.lineSeparator +
+ s"== Optimized Logical Plan ==" +
+ System.lineSeparator +
+ s"${RelOptUtil.toString(optimizedPlan)}" +
+ System.lineSeparator +
+ s"== Physical Execution Plan ==" +
+ System.lineSeparator +
+ s"$sqlPlan"
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
new file mode 100644
index 0000000..a8876a8
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api
+
+import _root_.java.util.TimeZone
+
+import org.apache.flink.table.calcite.CalciteConfig
+
+/**
+ * A config to define the runtime behavior of the Table API.
+ */
+class TableConfig {
+
+ /**
+ * Defines the timezone for date/time/timestamp conversions.
+ */
+ private var timeZone: TimeZone = TimeZone.getTimeZone("UTC")
+
+ /**
+ * Defines if all fields need to be checked for NULL first.
+ */
+ private var nullCheck: Boolean = true
+
+ /**
+ * Defines if efficient types (such as Tuple types or Atomic types)
+ * should be used within operators where possible.
+ */
+ private var efficientTypeUsage = false
+
+ /**
+ * Defines the configuration of Calcite for Table API and SQL queries.
+ */
+ private var calciteConfig = CalciteConfig.DEFAULT
+
+ /**
+ * Sets the timezone for date/time/timestamp conversions.
+ */
+ def setTimeZone(timeZone: TimeZone): Unit = {
+ require(timeZone != null, "timeZone must not be null.")
+ this.timeZone = timeZone
+ }
+
+ /**
+ * Returns the timezone for date/time/timestamp conversions.
+ */
+ def getTimeZone = timeZone
+
+ /**
+ * Returns the NULL check. If enabled, all fields need to be checked for NULL first.
+ */
+ def getNullCheck = nullCheck
+
+ /**
+ * Sets the NULL check. If enabled, all fields need to be checked for NULL first.
+ */
+ def setNullCheck(nullCheck: Boolean): Unit = {
+ this.nullCheck = nullCheck
+ }
+
+ /**
+ * Returns the usage of efficient types. If enabled, efficient types (such as Tuple types
+ * or Atomic types) are used within operators where possible.
+ *
+ * NOTE: Currently, this is an experimental feature.
+ */
+ def getEfficientTypeUsage = efficientTypeUsage
+
+ /**
+ * Sets the usage of efficient types. If enabled, efficient types (such as Tuple types
+ * or Atomic types) are used within operators where possible.
+ *
+ * NOTE: Currently, this is an experimental feature.
+ */
+ def setEfficientTypeUsage(efficientTypeUsage: Boolean): Unit = {
+ this.efficientTypeUsage = efficientTypeUsage
+ }
+
+ /**
+ * Returns the current configuration of Calcite for Table API and SQL queries.
+ */
+ def getCalciteConfig: CalciteConfig = calciteConfig
+
+ /**
+ * Sets the configuration of Calcite for Table API and SQL queries.
+ * Changing the configuration has no effect after the first query has been defined.
+ */
+ def setCalciteConfig(calciteConfig: CalciteConfig): Unit = {
+ this.calciteConfig = calciteConfig
+ }
+}
+
+object TableConfig {
+ def DEFAULT = new TableConfig()
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
new file mode 100644
index 0000000..769008f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import _root_.java.util.concurrent.atomic.AtomicInteger
+import _root_.java.lang.reflect.Modifier
+
+import org.apache.calcite.config.Lex
+import org.apache.calcite.plan.RelOptPlanner
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.schema.SchemaPlus
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.sql.SqlOperatorTable
+import org.apache.calcite.sql.parser.SqlParser
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable
+import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets}
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv}
+import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv}
+import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
+import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
+import org.apache.flink.table.codegen.ExpressionReducer
+import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
+import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
+import org.apache.flink.table.plan.cost.DataSetCostFactory
+import org.apache.flink.table.plan.schema.RelTable
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.validate.FunctionCatalog
+
+import _root_.scala.collection.JavaConverters._
+
+/**
+ * The abstract base class for batch and stream TableEnvironments.
+ *
+ * @param config The configuration of the TableEnvironment
+ */
+abstract class TableEnvironment(val config: TableConfig) {
+
+ // the catalog to hold all registered and translated tables
+ private val tables: SchemaPlus = Frameworks.createRootSchema(true)
+
+ // Table API/SQL function catalog
+ private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns
+
+ // the configuration to create a Calcite planner
+ private lazy val frameworkConfig: FrameworkConfig = Frameworks
+ .newConfigBuilder
+ .defaultSchema(tables)
+ .parserConfig(getSqlParserConfig)
+ .costFactory(new DataSetCostFactory)
+ .typeSystem(new FlinkTypeSystem)
+ .operatorTable(getSqlOperatorTable)
+ // set the executor to evaluate constant expressions
+ .executor(new ExpressionReducer(config))
+ .build
+
+ // the builder for Calcite RelNodes, Calcite's representation of a relational expression tree.
+ protected lazy val relBuilder: FlinkRelBuilder = FlinkRelBuilder.create(frameworkConfig)
+
+ // the planner instance used to optimize queries of this TableEnvironment
+ private lazy val planner: RelOptPlanner = relBuilder.getPlanner
+
+ private lazy val typeFactory: FlinkTypeFactory = relBuilder.getTypeFactory
+
+ // a counter for unique attribute names
+ private val attrNameCntr: AtomicInteger = new AtomicInteger(0)
+
+ /** Returns the table config to define the runtime behavior of the Table API. */
+ def getConfig = config
+
+ /**
+ * Returns the operator table for this environment including a custom Calcite configuration.
+ */
+ protected def getSqlOperatorTable: SqlOperatorTable = {
+ val calciteConfig = config.getCalciteConfig
+ calciteConfig.getSqlOperatorTable match {
+
+ case None =>
+ functionCatalog.getSqlOperatorTable
+
+ case Some(table) =>
+ if (calciteConfig.replacesSqlOperatorTable) {
+ table
+ } else {
+ ChainedSqlOperatorTable.of(functionCatalog.getSqlOperatorTable, table)
+ }
+ }
+ }
+
+ /**
+ * Returns the rule set for this environment including a custom Calcite configuration.
+ */
+ protected def getRuleSet: RuleSet = {
+ val calciteConfig = config.getCalciteConfig
+ calciteConfig.getRuleSet match {
+
+ case None =>
+ getBuiltInRuleSet
+
+ case Some(ruleSet) =>
+ if (calciteConfig.replacesRuleSet) {
+ ruleSet
+ } else {
+ RuleSets.ofList((getBuiltInRuleSet.asScala ++ ruleSet.asScala).asJava)
+ }
+ }
+ }
+
+ /**
+ * Returns the SQL parser config for this environment including a custom Calcite configuration.
+ */
+ protected def getSqlParserConfig: SqlParser.Config = {
+ val calciteConfig = config.getCalciteConfig
+ calciteConfig.getSqlParserConfig match {
+
+ case None =>
+ // we use Java lex because back ticks are easier than double quotes in programming
+ // and cases are preserved
+ SqlParser
+ .configBuilder()
+ .setLex(Lex.JAVA)
+ .build()
+
+ case Some(sqlParserConfig) =>
+ sqlParserConfig
+ }
+ }
+
+ /**
+ * Returns the built-in rules that are defined by the environment.
+ */
+ protected def getBuiltInRuleSet: RuleSet
+
+ /**
+ * Registers a [[ScalarFunction]] under a unique name. Replaces already existing
+ * user-defined functions under this name.
+ */
+ def registerFunction(name: String, function: ScalarFunction): Unit = {
+ // check if class could be instantiated
+ checkForInstantiation(function.getClass)
+
+ // register in Table API
+ functionCatalog.registerFunction(name, function.getClass)
+
+ // register in SQL API
+ functionCatalog.registerSqlFunction(createScalarSqlFunction(name, function, typeFactory))
+ }
+
+ /**
+ * Registers a [[TableFunction]] under a unique name. Replaces already existing
+ * user-defined functions under this name.
+ */
+ private[flink] def registerTableFunctionInternal[T: TypeInformation](
+ name: String, function: TableFunction[T]): Unit = {
+ // check if class not Scala object
+ checkNotSingleton(function.getClass)
+ // check if class could be instantiated
+ checkForInstantiation(function.getClass)
+
+ val typeInfo: TypeInformation[_] = if (function.getResultType != null) {
+ function.getResultType
+ } else {
+ implicitly[TypeInformation[T]]
+ }
+
+ // register in Table API
+ functionCatalog.registerFunction(name, function.getClass)
+
+ // register in SQL API
+ val sqlFunctions = createTableSqlFunctions(name, function, typeInfo, typeFactory)
+ functionCatalog.registerSqlFunctions(sqlFunctions)
+ }
+
+ /**
+ * Registers a [[Table]] under a unique name in the TableEnvironment's catalog.
+ * Registered tables can be referenced in SQL queries.
+ *
+ * @param name The name under which the table is registered.
+ * @param table The table to register.
+ */
+ def registerTable(name: String, table: Table): Unit = {
+
+ // check that table belongs to this table environment
+ if (table.tableEnv != this) {
+ throw new TableException(
+ "Only tables that belong to this TableEnvironment can be registered.")
+ }
+
+ checkValidTableName(name)
+ val tableTable = new RelTable(table.getRelNode)
+ registerTableInternal(name, tableTable)
+ }
+
+ /**
+ * Replaces a registered Table with another Table under the same name.
+ * We use this method to replace a [[org.apache.flink.table.plan.schema.DataStreamTable]]
+ * with a [[org.apache.calcite.schema.TranslatableTable]].
+ *
+ * @param name Name of the table to replace.
+ * @param table The table that replaces the previous table.
+ */
+ protected def replaceRegisteredTable(name: String, table: AbstractTable): Unit = {
+
+ if (isRegistered(name)) {
+ tables.add(name, table)
+ } else {
+ throw new TableException(s"Table \'$name\' is not registered.")
+ }
+ }
+
+ /**
+ * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
+ *
+ * All tables referenced by the query must be registered in the TableEnvironment.
+ *
+ * @param query The SQL query to evaluate.
+ * @return The result of the query as Table.
+ */
+ def sql(query: String): Table
+
+ /**
+ * Writes a [[Table]] to a [[TableSink]].
+ *
+ * @param table The [[Table]] to write.
+ * @param sink The [[TableSink]] to write the [[Table]] to.
+ * @tparam T The data type that the [[TableSink]] expects.
+ */
+ private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit
+
+ /**
+ * Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog.
+ *
+ * @param name The name under which the table is registered.
+ * @param table The table to register in the catalog
+ * @throws TableException if another table is registered under the provided name.
+ */
+ @throws[TableException]
+ protected def registerTableInternal(name: String, table: AbstractTable): Unit = {
+
+ if (isRegistered(name)) {
+ throw new TableException(s"Table \'$name\' already exists. " +
+ s"Please, choose a different name.")
+ } else {
+ tables.add(name, table)
+ }
+ }
+
+ /**
+ * Checks if the chosen table name is valid.
+ *
+ * @param name The table name to check.
+ */
+ protected def checkValidTableName(name: String): Unit
+
+ /**
+ * Checks if a table is registered under the given name.
+ *
+ * @param name The table name to check.
+ * @return true, if a table is registered under the name, false otherwise.
+ */
+ protected def isRegistered(name: String): Boolean = {
+ tables.getTableNames.contains(name)
+ }
+
+ protected def getRowType(name: String): RelDataType = {
+ tables.getTable(name).getRowType(typeFactory)
+ }
+
+ /** Returns a unique temporary attribute name. */
+ private[flink] def createUniqueAttributeName(): String = {
+ "TMP_" + attrNameCntr.getAndIncrement()
+ }
+
+ /** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */
+ private[flink] def getRelBuilder: FlinkRelBuilder = {
+ relBuilder
+ }
+
+ /** Returns the Calcite [[org.apache.calcite.plan.RelOptPlanner]] of this TableEnvironment. */
+ private[flink] def getPlanner: RelOptPlanner = {
+ planner
+ }
+
+ /** Returns the [[FlinkTypeFactory]] of this TableEnvironment. */
+ private[flink] def getTypeFactory: FlinkTypeFactory = {
+ typeFactory
+ }
+
+ private[flink] def getFunctionCatalog: FunctionCatalog = {
+ functionCatalog
+ }
+
+ /** Returns the Calcite [[FrameworkConfig]] of this TableEnvironment. */
+ private[flink] def getFrameworkConfig: FrameworkConfig = {
+ frameworkConfig
+ }
+
+ protected def validateType(typeInfo: TypeInformation[_]): Unit = {
+ val clazz = typeInfo.getTypeClass
+ if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+ !Modifier.isPublic(clazz.getModifiers) ||
+ clazz.getCanonicalName == null) {
+ throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " +
+ s"static and globally accessible.")
+ }
+ }
+
+ /**
+ * Returns field names and field positions for a given [[TypeInformation]].
+ *
+ * Field names are automatically extracted for
+ * [[org.apache.flink.api.common.typeutils.CompositeType]].
+ * The method fails if inputType is not a
+ * [[org.apache.flink.api.common.typeutils.CompositeType]].
+ *
+ * @param inputType The TypeInformation extract the field names and positions from.
+ * @tparam A The type of the TypeInformation.
+ * @return A tuple of two arrays holding the field names and corresponding field positions.
+ */
+ protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
+ (Array[String], Array[Int]) =
+ {
+ validateType(inputType)
+
+ val fieldNames: Array[String] = inputType match {
+ case t: TupleTypeInfo[A] => t.getFieldNames
+ case c: CaseClassTypeInfo[A] => c.getFieldNames
+ case p: PojoTypeInfo[A] => p.getFieldNames
+ case r: RowTypeInfo => r.getFieldNames
+ case tpe =>
+ throw new TableException(s"Type $tpe lacks explicit field naming")
+ }
+ val fieldIndexes = fieldNames.indices.toArray
+
+ if (fieldNames.contains("*")) {
+ throw new TableException("Field name can not be '*'.")
+ }
+
+ (fieldNames, fieldIndexes)
+ }
+
+ /**
+ * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
+ * [[Expression]].
+ *
+ * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
+ * @param exprs The expressions that define the field names.
+ * @tparam A The type of the TypeInformation.
+ * @return A tuple of two arrays holding the field names and corresponding field positions.
+ */
+ protected[flink] def getFieldInfo[A](
+ inputType: TypeInformation[A],
+ exprs: Array[Expression]): (Array[String], Array[Int]) = {
+
+ validateType(inputType)
+
+ val indexedNames: Array[(Int, String)] = inputType match {
+ case a: AtomicType[A] =>
+ if (exprs.length != 1) {
+ throw new TableException("Table of atomic type can only have a single field.")
+ }
+ exprs.map {
+ case UnresolvedFieldReference(name) => (0, name)
+ case _ => throw new TableException("Field reference expression expected.")
+ }
+ case t: TupleTypeInfo[A] =>
+ exprs.zipWithIndex.map {
+ case (UnresolvedFieldReference(name), idx) => (idx, name)
+ case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
+ val idx = t.getFieldIndex(origName)
+ if (idx < 0) {
+ throw new TableException(s"$origName is not a field of type $t")
+ }
+ (idx, name)
+ case _ => throw new TableException(
+ "Field reference expression or alias on field expression expected.")
+ }
+ case c: CaseClassTypeInfo[A] =>
+ exprs.zipWithIndex.map {
+ case (UnresolvedFieldReference(name), idx) => (idx, name)
+ case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
+ val idx = c.getFieldIndex(origName)
+ if (idx < 0) {
+ throw new TableException(s"$origName is not a field of type $c")
+ }
+ (idx, name)
+ case _ => throw new TableException(
+ "Field reference expression or alias on field expression expected.")
+ }
+ case p: PojoTypeInfo[A] =>
+ exprs.map {
+ case (UnresolvedFieldReference(name)) =>
+ val idx = p.getFieldIndex(name)
+ if (idx < 0) {
+ throw new TableException(s"$name is not a field of type $p")
+ }
+ (idx, name)
+ case Alias(UnresolvedFieldReference(origName), name, _) =>
+ val idx = p.getFieldIndex(origName)
+ if (idx < 0) {
+ throw new TableException(s"$origName is not a field of type $p")
+ }
+ (idx, name)
+ case _ => throw new TableException(
+ "Field reference expression or alias on field expression expected.")
+ }
+ case tpe => throw new TableException(
+ s"Source of type $tpe cannot be converted into Table.")
+ }
+
+ val (fieldIndexes, fieldNames) = indexedNames.unzip
+
+ if (fieldNames.contains("*")) {
+ throw new TableException("Field name can not be '*'.")
+ }
+
+ (fieldNames.toArray, fieldIndexes.toArray)
+ }
+
+}
+
+/**
+ * Object to instantiate a [[TableEnvironment]] depending on the batch or stream execution
+ * environment.
+ */
+object TableEnvironment {
+
+ /**
+ * Returns a [[JavaBatchTableEnv]] for a Java [[JavaBatchExecEnv]].
+ *
+ * @param executionEnvironment The Java batch ExecutionEnvironment.
+ */
+ def getTableEnvironment(executionEnvironment: JavaBatchExecEnv): JavaBatchTableEnv = {
+ new JavaBatchTableEnv(executionEnvironment, new TableConfig())
+ }
+
+ /**
+ * Returns a [[JavaBatchTableEnv]] for a Java [[JavaBatchExecEnv]] and a given [[TableConfig]].
+ *
+ * @param executionEnvironment The Java batch ExecutionEnvironment.
+ * @param tableConfig The TableConfig for the new TableEnvironment.
+ */
+ def getTableEnvironment(
+ executionEnvironment: JavaBatchExecEnv,
+ tableConfig: TableConfig): JavaBatchTableEnv = {
+
+ new JavaBatchTableEnv(executionEnvironment, tableConfig)
+ }
+
+ /**
+ * Returns a [[ScalaBatchTableEnv]] for a Scala [[ScalaBatchExecEnv]].
+ *
+ * @param executionEnvironment The Scala batch ExecutionEnvironment.
+ */
+ def getTableEnvironment(executionEnvironment: ScalaBatchExecEnv): ScalaBatchTableEnv = {
+ new ScalaBatchTableEnv(executionEnvironment, new TableConfig())
+ }
+
+ /**
+ * Returns a [[ScalaBatchTableEnv]] for a Scala [[ScalaBatchExecEnv]] and a given
+ * [[TableConfig]].
+ *
+ * @param executionEnvironment The Scala batch ExecutionEnvironment.
+ * @param tableConfig The TableConfig for the new TableEnvironment.
+ */
+ def getTableEnvironment(
+ executionEnvironment: ScalaBatchExecEnv,
+ tableConfig: TableConfig): ScalaBatchTableEnv = {
+
+ new ScalaBatchTableEnv(executionEnvironment, tableConfig)
+ }
+
+ /**
+ * Returns a [[JavaStreamTableEnv]] for a Java [[JavaStreamExecEnv]].
+ *
+ * @param executionEnvironment The Java StreamExecutionEnvironment.
+ */
+ def getTableEnvironment(executionEnvironment: JavaStreamExecEnv): JavaStreamTableEnv = {
+ new JavaStreamTableEnv(executionEnvironment, new TableConfig())
+ }
+
+ /**
+ * Returns a [[JavaStreamTableEnv]] for a Java [[JavaStreamExecEnv]] and a given [[TableConfig]].
+ *
+ * @param executionEnvironment The Java StreamExecutionEnvironment.
+ * @param tableConfig The TableConfig for the new TableEnvironment.
+ */
+ def getTableEnvironment(
+ executionEnvironment: JavaStreamExecEnv,
+ tableConfig: TableConfig): JavaStreamTableEnv = {
+
+ new JavaStreamTableEnv(executionEnvironment, tableConfig)
+ }
+
+ /**
+ * Returns a [[ScalaStreamTableEnv]] for a Scala stream [[ScalaStreamExecEnv]].
+ *
+ * @param executionEnvironment The Scala StreamExecutionEnvironment.
+ */
+ def getTableEnvironment(executionEnvironment: ScalaStreamExecEnv): ScalaStreamTableEnv = {
+ new ScalaStreamTableEnv(executionEnvironment, new TableConfig())
+ }
+
+ /**
+ * Returns a [[ScalaStreamTableEnv]] for a Scala stream [[ScalaStreamExecEnv]].
+ *
+ * @param executionEnvironment The Scala StreamExecutionEnvironment.
+ * @param tableConfig The TableConfig for the new TableEnvironment.
+ */
+ def getTableEnvironment(
+ executionEnvironment: ScalaStreamExecEnv,
+ tableConfig: TableConfig): ScalaStreamTableEnv = {
+
+ new ScalaStreamTableEnv(executionEnvironment, tableConfig)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
new file mode 100644
index 0000000..939cb67
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+
+/**
+ * This class enumerates all supported types of the Table API.
+ */
+object Types {
+
+ val STRING = BasicTypeInfo.STRING_TYPE_INFO
+ val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO
+
+ val BYTE = BasicTypeInfo.BYTE_TYPE_INFO
+ val SHORT = BasicTypeInfo.SHORT_TYPE_INFO
+ val INT = BasicTypeInfo.INT_TYPE_INFO
+ val LONG = BasicTypeInfo.LONG_TYPE_INFO
+ val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO
+ val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO
+ val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO
+
+ val DATE = SqlTimeTypeInfo.DATE
+ val TIME = SqlTimeTypeInfo.TIME
+ val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
+ val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
+ val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
new file mode 100644
index 0000000..7ca45a6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+/**
+ * Exception for all errors occurring during expression parsing.
+ */
+case class ExpressionParserException(msg: String) extends RuntimeException(msg)
+
+/**
+ * Exception for all errors occurring during sql parsing.
+ */
+case class SqlParserException(
+ msg: String,
+ cause: Throwable)
+ extends RuntimeException(msg, cause) {
+
+ def this(msg: String) = this(msg, null)
+
+}
+
+/**
+ * General Exception for all errors during table handling.
+ */
+case class TableException(
+ msg: String,
+ cause: Throwable)
+ extends RuntimeException(msg, cause) {
+
+ def this(msg: String) = this(msg, null)
+
+}
+
+object TableException {
+ def apply(msg: String): TableException = new TableException(msg)
+}
+
+/**
+ * Exception for all errors occurring during validation phase.
+ */
+case class ValidationException(
+ msg: String,
+ cause: Throwable)
+ extends RuntimeException(msg, cause) {
+
+ def this(msg: String) = this(msg, null)
+
+}
+
+object ValidationException {
+ def apply(msg: String): ValidationException = new ValidationException(msg)
+}
+
+/**
+ * Exception for unwanted method calling on unresolved expression.
+ */
+case class UnresolvedException(msg: String) extends RuntimeException(msg)
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
new file mode 100644
index 0000000..15e842e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.java
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.table.expressions.ExpressionParser
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.TableFunction
+
+/**
+ * The [[TableEnvironment]] for a Java batch [[DataSet]]
+ * [[ExecutionEnvironment]].
+ *
+ * A TableEnvironment can be used to:
+ * - convert a [[DataSet]] to a [[Table]]
+ * - register a [[DataSet]] in the [[TableEnvironment]]'s catalog
+ * - register a [[Table]] in the [[TableEnvironment]]'s catalog
+ * - scan a registered table to obtain a [[Table]]
+ * - specify a SQL query on registered tables to obtain a [[Table]]
+ * - convert a [[Table]] into a [[DataSet]]
+ * - explain the AST and execution plan of a [[Table]]
+ *
+ * @param execEnv The Java batch [[ExecutionEnvironment]] of the TableEnvironment.
+ * @param config The configuration of the TableEnvironment.
+ */
+class BatchTableEnvironment(
+ execEnv: ExecutionEnvironment,
+ config: TableConfig)
+ extends org.apache.flink.table.api.BatchTableEnvironment(execEnv, config) {
+
+ /**
+ * Converts the given [[DataSet]] into a [[Table]].
+ *
+ * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
+ *
+ * @param dataSet The [[DataSet]] to be converted.
+ * @tparam T The type of the [[DataSet]].
+ * @return The converted [[Table]].
+ */
+ def fromDataSet[T](dataSet: DataSet[T]): Table = {
+
+ val name = createUniqueTableName()
+ registerDataSetInternal(name, dataSet)
+ scan(name)
+ }
+
+ /**
+ * Converts the given [[DataSet]] into a [[Table]] with specified field names.
+ *
+ * Example:
+ *
+ * {{{
+ * DataSet<Tuple2<String, Long>> set = ...
+ * Table tab = tableEnv.fromDataSet(set, "a, b")
+ * }}}
+ *
+ * @param dataSet The [[DataSet]] to be converted.
+ * @param fields The field names of the resulting [[Table]].
+ * @tparam T The type of the [[DataSet]].
+ * @return The converted [[Table]].
+ */
+ def fromDataSet[T](dataSet: DataSet[T], fields: String): Table = {
+ val exprs = ExpressionParser
+ .parseExpressionList(fields)
+ .toArray
+
+ val name = createUniqueTableName()
+ registerDataSetInternal(name, dataSet, exprs)
+ scan(name)
+ }
+
+ /**
+ * Registers the given [[DataSet]] as table in the
+ * [[TableEnvironment]]'s catalog.
+ * Registered tables can be referenced in SQL queries.
+ *
+ * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
+ *
+ * @param name The name under which the [[DataSet]] is registered in the catalog.
+ * @param dataSet The [[DataSet]] to register.
+ * @tparam T The type of the [[DataSet]] to register.
+ */
+ def registerDataSet[T](name: String, dataSet: DataSet[T]): Unit = {
+
+ checkValidTableName(name)
+ registerDataSetInternal(name, dataSet)
+ }
+
+ /**
+ * Registers the given [[DataSet]] as table with specified field names in the
+ * [[TableEnvironment]]'s catalog.
+ * Registered tables can be referenced in SQL queries.
+ *
+ * Example:
+ *
+ * {{{
+ * DataSet<Tuple2<String, Long>> set = ...
+ * tableEnv.registerDataSet("myTable", set, "a, b")
+ * }}}
+ *
+ * @param name The name under which the [[DataSet]] is registered in the catalog.
+ * @param dataSet The [[DataSet]] to register.
+ * @param fields The field names of the registered table.
+ * @tparam T The type of the [[DataSet]] to register.
+ */
+ def registerDataSet[T](name: String, dataSet: DataSet[T], fields: String): Unit = {
+ val exprs = ExpressionParser
+ .parseExpressionList(fields)
+ .toArray
+
+ checkValidTableName(name)
+ registerDataSetInternal(name, dataSet, exprs)
+ }
+
+ /**
+ * Converts the given [[Table]] into a [[DataSet]] of a specified type.
+ *
+ * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
+ * - [[Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * types: Fields are mapped by position, field types must match.
+ * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
+ *
+ * @param table The [[Table]] to convert.
+ * @param clazz The class of the type of the resulting [[DataSet]].
+ * @tparam T The type of the resulting [[DataSet]].
+ * @return The converted [[DataSet]].
+ */
+ def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = {
+ translate[T](table)(TypeExtractor.createTypeInfo(clazz))
+ }
+
+ /**
+ * Converts the given [[Table]] into a [[DataSet]] of a specified type.
+ *
+ * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
+ * - [[Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * types: Fields are mapped by position, field types must match.
+ * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
+ *
+ * @param table The [[Table]] to convert.
+ * @param typeInfo The [[TypeInformation]] that specifies the type of the resulting [[DataSet]].
+ * @tparam T The type of the resulting [[DataSet]].
+ * @return The converted [[DataSet]].
+ */
+ def toDataSet[T](table: Table, typeInfo: TypeInformation[T]): DataSet[T] = {
+ translate[T](table)(typeInfo)
+ }
+
+ /**
+ * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
+ * Registered functions can be referenced in Table API and SQL queries.
+ *
+ * @param name The name under which the function is registered.
+ * @param tf The TableFunction to register.
+ * @tparam T The type of the output row.
+ */
+ def registerFunction[T](name: String, tf: TableFunction[T]): Unit = {
+ implicit val typeInfo: TypeInformation[T] = TypeExtractor
+ .createTypeInfo(tf, classOf[TableFunction[_]], tf.getClass, 0)
+ .asInstanceOf[TypeInformation[T]]
+
+ registerTableFunctionInternal[T](name, tf)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
new file mode 100644
index 0000000..428dcae
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.java
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.table.expressions.ExpressionParser
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+
+/**
+ * The [[TableEnvironment]] for a Java [[StreamExecutionEnvironment]].
+ *
+ * A TableEnvironment can be used to:
+ * - convert a [[DataStream]] to a [[Table]]
+ * - register a [[DataStream]] in the [[TableEnvironment]]'s catalog
+ * - register a [[Table]] in the [[TableEnvironment]]'s catalog
+ * - scan a registered table to obtain a [[Table]]
+ * - specify a SQL query on registered tables to obtain a [[Table]]
+ * - convert a [[Table]] into a [[DataStream]]
+ * - explain the AST and execution plan of a [[Table]]
+ *
+ * @param execEnv The Java [[StreamExecutionEnvironment]] of the TableEnvironment.
+ * @param config The configuration of the TableEnvironment.
+ */
+class StreamTableEnvironment(
+ execEnv: StreamExecutionEnvironment,
+ config: TableConfig)
+ extends org.apache.flink.table.api.StreamTableEnvironment(execEnv, config) {
+
+ /**
+ * Converts the given [[DataStream]] into a [[Table]].
+ *
+ * The field names of the [[Table]] are automatically derived from the type of the
+ * [[DataStream]].
+ *
+ * @param dataStream The [[DataStream]] to be converted.
+ * @tparam T The type of the [[DataStream]].
+ * @return The converted [[Table]].
+ */
+ def fromDataStream[T](dataStream: DataStream[T]): Table = {
+
+ val name = createUniqueTableName()
+ registerDataStreamInternal(name, dataStream)
+ ingest(name)
+ }
+
+ /**
+ * Converts the given [[DataStream]] into a [[Table]] with specified field names.
+ *
+ * Example:
+ *
+ * {{{
+ * DataStream<Tuple2<String, Long>> stream = ...
+ * Table tab = tableEnv.fromDataStream(stream, "a, b")
+ * }}}
+ *
+ * @param dataStream The [[DataStream]] to be converted.
+ * @param fields The field names of the resulting [[Table]].
+ * @tparam T The type of the [[DataStream]].
+ * @return The converted [[Table]].
+ */
+ def fromDataStream[T](dataStream: DataStream[T], fields: String): Table = {
+ val exprs = ExpressionParser
+ .parseExpressionList(fields)
+ .toArray
+
+ val name = createUniqueTableName()
+ registerDataStreamInternal(name, dataStream, exprs)
+ ingest(name)
+ }
+
+ /**
+ * Registers the given [[DataStream]] as table in the
+ * [[TableEnvironment]]'s catalog.
+ * Registered tables can be referenced in SQL queries.
+ *
+ * The field names of the [[Table]] are automatically derived
+ * from the type of the [[DataStream]].
+ *
+ * @param name The name under which the [[DataStream]] is registered in the catalog.
+ * @param dataStream The [[DataStream]] to register.
+ * @tparam T The type of the [[DataStream]] to register.
+ */
+ def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit = {
+
+ checkValidTableName(name)
+ registerDataStreamInternal(name, dataStream)
+ }
+
+ /**
+ * Registers the given [[DataStream]] as table with specified field names in the
+ * [[TableEnvironment]]'s catalog.
+ * Registered tables can be referenced in SQL queries.
+ *
+ * Example:
+ *
+ * {{{
+ * DataStream<Tuple2<String, Long>> set = ...
+ * tableEnv.registerDataStream("myTable", set, "a, b")
+ * }}}
+ *
+ * @param name The name under which the [[DataStream]] is registered in the catalog.
+ * @param dataStream The [[DataStream]] to register.
+ * @param fields The field names of the registered table.
+ * @tparam T The type of the [[DataStream]] to register.
+ */
+ def registerDataStream[T](name: String, dataStream: DataStream[T], fields: String): Unit = {
+ val exprs = ExpressionParser
+ .parseExpressionList(fields)
+ .toArray
+
+ checkValidTableName(name)
+ registerDataStreamInternal(name, dataStream, exprs)
+ }
+
+ /**
+ * Converts the given [[Table]] into a [[DataStream]] of a specified type.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * types: Fields are mapped by position, field types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * @param table The [[Table]] to convert.
+ * @param clazz The class of the type of the resulting [[DataStream]].
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ */
+ def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
+ translate[T](table)(TypeExtractor.createTypeInfo(clazz))
+ }
+
+ /**
+ * Converts the given [[Table]] into a [[DataStream]] of a specified type.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * types: Fields are mapped by position, field types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * @param table The [[Table]] to convert.
+ * @param typeInfo The [[TypeInformation]] that specifies the type of the [[DataStream]].
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ */
+ def toDataStream[T](table: Table, typeInfo: TypeInformation[T]): DataStream[T] = {
+ translate[T](table)(typeInfo)
+ }
+
+ /**
+ * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
+ * Registered functions can be referenced in Table API and SQL queries.
+ *
+ * @param name The name under which the function is registered.
+ * @param tf The TableFunction to register.
+ * @tparam T The type of the output row.
+ */
+ def registerFunction[T](name: String, tf: TableFunction[T]): Unit = {
+ implicit val typeInfo: TypeInformation[T] = TypeExtractor
+ .createTypeInfo(tf, classOf[TableFunction[_]], tf.getClass, 0)
+ .asInstanceOf[TypeInformation[T]]
+
+ registerTableFunctionInternal[T](name, tf)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/groupWindows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/groupWindows.scala
new file mode 100644
index 0000000..9c82e9b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/groupWindows.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java
+
+import org.apache.flink.table.api.{SessionWindow, SlideWithSize, TumblingWindow}
+
+/**
+ * Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping
+ * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
+ * elements in 5 minutes intervals.
+ */
+object Tumble {
+
+ /**
+ * Creates a tumbling window. Tumbling windows are consecutive, non-overlapping
+ * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
+ * elements in 5 minutes intervals.
+ *
+ * @param size the size of the window as time or row-count interval.
+ * @return a tumbling window
+ */
+ def over(size: String): TumblingWindow = new TumblingWindow(size)
+}
+
+/**
+ * Helper class for creating a sliding window. Sliding windows have a fixed size and slide by
+ * a specified slide interval. If the slide interval is smaller than the window size, sliding
+ * windows are overlapping. Thus, an element can be assigned to multiple windows.
+ *
+ * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements
+ * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive
+ * window evaluations.
+ */
+object Slide {
+
+ /**
+ * Creates a sliding window. Sliding windows have a fixed size and slide by
+ * a specified slide interval. If the slide interval is smaller than the window size, sliding
+ * windows are overlapping. Thus, an element can be assigned to multiple windows.
+ *
+ * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups
+ * elements of 15 minutes and evaluates every five minutes. Each element is contained in three
+ * consecutive window evaluations.
+ *
+ * @param size the size of the window as time or row-count interval
+ * @return a partially specified sliding window
+ */
+ def over(size: String): SlideWithSize = new SlideWithSize(size)
+}
+
+/**
+ * Helper class for creating a session window. The boundary of session windows are defined by
+ * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
+ * gap period.
+ */
+object Session {
+
+ /**
+ * Creates a session window. The boundary of session windows are defined by
+ * intervals of inactivity, i.e., a session window is closes if no event appears for a defined
+ * gap period.
+ *
+ * @param gap specifies how long (as interval of milliseconds) to wait for new data before
+ * closing the session window.
+ * @return a session window
+ */
+ def withGap(gap: String): SessionWindow = new SessionWindow(gap)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/package.scala
new file mode 100644
index 0000000..e16e1a8
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/package.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api
+
+/**
+ * == Table API ==
+ *
+ * This package contains the API of the Table API. It can be used with Flink Streaming
+ * and Flink Batch. From Scala as well as from Java.
+ *
+ * When using the Table API, as user creates a [[org.apache.flink.table.api.Table]] from
+ * a DataSet or DataStream. On this relational operations can be performed. A table can also
+ * be converted back to a DataSet or DataStream.
+ *
+ * Packages [[org.apache.flink.table.api.scala]] and [[org.apache.flink.table.api.java]] contain
+ * the language specific part of the API. Refer to these packages for documentation on how
+ * the Table API can be used in Java and Scala.
+ */
+package object api
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
new file mode 100644
index 0000000..3ae8c31
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api._
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.functions.TableFunction
+
+import _root_.scala.reflect.ClassTag
+
+/**
+ * The [[TableEnvironment]] for a Scala batch [[DataSet]]
+ * [[ExecutionEnvironment]].
+ *
+ * A TableEnvironment can be used to:
+ * - convert a [[DataSet]] to a [[Table]]
+ * - register a [[DataSet]] in the [[TableEnvironment]]'s catalog
+ * - register a [[Table]] in the [[TableEnvironment]]'s catalog
+ * - scan a registered table to obtain a [[Table]]
+ * - specify a SQL query on registered tables to obtain a [[Table]]
+ * - convert a [[Table]] into a [[DataSet]]
+ * - explain the AST and execution plan of a [[Table]]
+ *
+ * @param execEnv The Scala batch [[ExecutionEnvironment]] of the TableEnvironment.
+ * @param config The configuration of the TableEnvironment.
+ */
+class BatchTableEnvironment(
+ execEnv: ExecutionEnvironment,
+ config: TableConfig)
+ extends org.apache.flink.table.api.BatchTableEnvironment(execEnv.getJavaEnv, config) {
+
+ /**
+ * Converts the given [[DataSet]] into a [[Table]].
+ *
+ * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
+ *
+ * @param dataSet The [[DataSet]] to be converted.
+ * @tparam T The type of the [[DataSet]].
+ * @return The converted [[Table]].
+ */
+ def fromDataSet[T](dataSet: DataSet[T]): Table = {
+
+ val name = createUniqueTableName()
+ registerDataSetInternal(name, dataSet.javaSet)
+ scan(name)
+ }
+
+ /**
+ * Converts the given [[DataSet]] into a [[Table]] with specified field names.
+ *
+ * Example:
+ *
+ * {{{
+ * val set: DataSet[(String, Long)] = ...
+ * val tab: Table = tableEnv.fromDataSet(set, 'a, 'b)
+ * }}}
+ *
+ * @param dataSet The [[DataSet]] to be converted.
+ * @param fields The field names of the resulting [[Table]].
+ * @tparam T The type of the [[DataSet]].
+ * @return The converted [[Table]].
+ */
+ def fromDataSet[T](dataSet: DataSet[T], fields: Expression*): Table = {
+
+ val name = createUniqueTableName()
+ registerDataSetInternal(name, dataSet.javaSet, fields.toArray)
+ scan(name)
+ }
+
+ /**
+ * Registers the given [[DataSet]] as table in the
+ * [[TableEnvironment]]'s catalog.
+ * Registered tables can be referenced in SQL queries.
+ *
+ * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]].
+ *
+ * @param name The name under which the [[DataSet]] is registered in the catalog.
+ * @param dataSet The [[DataSet]] to register.
+ * @tparam T The type of the [[DataSet]] to register.
+ */
+ def registerDataSet[T](name: String, dataSet: DataSet[T]): Unit = {
+
+ checkValidTableName(name)
+ registerDataSetInternal(name, dataSet.javaSet)
+ }
+
+ /**
+ * Registers the given [[DataSet]] as table with specified field names in the
+ * [[TableEnvironment]]'s catalog.
+ * Registered tables can be referenced in SQL queries.
+ *
+ * Example:
+ *
+ * {{{
+ * val set: DataSet[(String, Long)] = ...
+ * tableEnv.registerDataSet("myTable", set, 'a, 'b)
+ * }}}
+ *
+ * @param name The name under which the [[DataSet]] is registered in the catalog.
+ * @param dataSet The [[DataSet]] to register.
+ * @param fields The field names of the registered table.
+ * @tparam T The type of the [[DataSet]] to register.
+ */
+ def registerDataSet[T](name: String, dataSet: DataSet[T], fields: Expression*): Unit = {
+
+ checkValidTableName(name)
+ registerDataSetInternal(name, dataSet.javaSet, fields.toArray)
+ }
+
+ /**
+ * Converts the given [[Table]] into a [[DataSet]] of a specified type.
+ *
+ * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * types: Fields are mapped by position, field types must match.
+ * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
+ *
+ * @param table The [[Table]] to convert.
+ * @tparam T The type of the resulting [[DataSet]].
+ * @return The converted [[DataSet]].
+ */
+ def toDataSet[T: TypeInformation](table: Table): DataSet[T] = {
+ wrap[T](translate(table))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
+ }
+
+ /**
+ * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
+ * Registered functions can be referenced in Table API and SQL queries.
+ *
+ * @param name The name under which the function is registered.
+ * @param tf The TableFunction to register.
+ * @tparam T The type of the output row.
+ */
+ def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
+ registerTableFunctionInternal(name, tf)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala
new file mode 100644
index 0000000..4b92bdb
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.expressions.Expression
+
+/**
+ * Holds methods to convert a [[DataSet]] into a [[Table]].
+ *
+ * @param dataSet The [[DataSet]] to convert.
+ * @param inputType The [[TypeInformation]] for the type of the [[DataSet]].
+ * @tparam T The type of the [[DataSet]].
+ */
+class DataSetConversions[T](dataSet: DataSet[T], inputType: TypeInformation[T]) {
+
+ /**
+ * Converts the [[DataSet]] into a [[Table]].
+ *
+ * The field name of the new [[Table]] can be specified like this:
+ *
+ * {{{
+ * val env = ExecutionEnvironment.getExecutionEnvironment
+ * val tEnv = TableEnvironment.getTableEnvironment(env)
+ *
+ * val set: DataSet[(String, Int)] = ...
+ * val table = set.toTable(tEnv, 'name, 'amount)
+ * }}}
+ *
+ * If not explicitly specified, field names are automatically extracted from the type of
+ * the [[DataSet]].
+ *
+ * @param tableEnv The [[BatchTableEnvironment]] in which the new [[Table]] is created.
+ * @param fields The field names of the new [[Table]] (optional).
+ * @return The resulting [[Table]].
+ */
+ def toTable(tableEnv: BatchTableEnvironment, fields: Expression*): Table = {
+ if (fields.isEmpty) {
+ tableEnv.fromDataSet(dataSet)
+ } else {
+ tableEnv.fromDataSet(dataSet, fields: _*)
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
new file mode 100644
index 0000000..89f7627
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.streaming.api.scala.DataStream
+
+/**
+ * Holds methods to convert a [[DataStream]] into a [[Table]].
+ *
+ * @param dataStream The [[DataStream]] to convert.
+ * @param inputType The [[TypeInformation]] for the type of the [[DataStream]].
+ * @tparam T The type of the [[DataStream]].
+ */
+class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInformation[T]) {
+
+ /**
+ * Converts the [[DataStream]] into a [[Table]].
+ *
+ * The field name of the new [[Table]] can be specified like this:
+ *
+ * {{{
+ * val env = StreamExecutionEnvironment.getExecutionEnvironment
+ * val tEnv = TableEnvironment.getTableEnvironment(env)
+ *
+ * val stream: DataStream[(String, Int)] = ...
+ * val table = stream.toTable(tEnv, 'name, 'amount)
+ * }}}
+ *
+ * If not explicitly specified, field names are automatically extracted from the type of
+ * the [[DataStream]].
+ *
+ * @param tableEnv The [[StreamTableEnvironment]] in which the new [[Table]] is created.
+ * @param fields The field names of the new [[Table]] (optional).
+ * @return The resulting [[Table]].
+ */
+ def toTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table = {
+ if (fields.isEmpty) {
+ tableEnv.fromDataStream(dataStream)
+ } else {
+ tableEnv.fromDataStream(dataStream, fields:_*)
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
new file mode 100644
index 0000000..1e6749e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableEnvironment, Table, TableConfig}
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.scala.asScalaStream
+
+/**
+ * The [[TableEnvironment]] for a Scala [[StreamExecutionEnvironment]].
+ *
+ * A TableEnvironment can be used to:
+ * - convert a [[DataStream]] to a [[Table]]
+ * - register a [[DataStream]] in the [[TableEnvironment]]'s catalog
+ * - register a [[Table]] in the [[TableEnvironment]]'s catalog
+ * - scan a registered table to obtain a [[Table]]
+ * - specify a SQL query on registered tables to obtain a [[Table]]
+ * - convert a [[Table]] into a [[DataStream]]
+ * - explain the AST and execution plan of a [[Table]]
+ *
+ * @param execEnv The Scala [[StreamExecutionEnvironment]] of the TableEnvironment.
+ * @param config The configuration of the TableEnvironment.
+ */
+class StreamTableEnvironment(
+ execEnv: StreamExecutionEnvironment,
+ config: TableConfig)
+ extends org.apache.flink.table.api.StreamTableEnvironment(
+ execEnv.getWrappedStreamExecutionEnvironment,
+ config) {
+
+ /**
+ * Converts the given [[DataStream]] into a [[Table]].
+ *
+ * The field names of the [[Table]] are automatically derived from the type of the
+ * [[DataStream]].
+ *
+ * @param dataStream The [[DataStream]] to be converted.
+ * @tparam T The type of the [[DataStream]].
+ * @return The converted [[Table]].
+ */
+ def fromDataStream[T](dataStream: DataStream[T]): Table = {
+
+ val name = createUniqueTableName()
+ registerDataStreamInternal(name, dataStream.javaStream)
+ ingest(name)
+ }
+
+ /**
+ * Converts the given [[DataStream]] into a [[Table]] with specified field names.
+ *
+ * Example:
+ *
+ * {{{
+ * val stream: DataStream[(String, Long)] = ...
+ * val tab: Table = tableEnv.fromDataStream(stream, 'a, 'b)
+ * }}}
+ *
+ * @param dataStream The [[DataStream]] to be converted.
+ * @param fields The field names of the resulting [[Table]].
+ * @tparam T The type of the [[DataStream]].
+ * @return The converted [[Table]].
+ */
+ def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table = {
+
+ val name = createUniqueTableName()
+ registerDataStreamInternal(name, dataStream.javaStream, fields.toArray)
+ ingest(name)
+ }
+
+ /**
+ * Registers the given [[DataStream]] as table in the
+ * [[TableEnvironment]]'s catalog.
+ * Registered tables can be referenced in SQL queries.
+ *
+ * The field names of the [[Table]] are automatically derived
+ * from the type of the [[DataStream]].
+ *
+ * @param name The name under which the [[DataStream]] is registered in the catalog.
+ * @param dataStream The [[DataStream]] to register.
+ * @tparam T The type of the [[DataStream]] to register.
+ */
+ def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit = {
+
+ checkValidTableName(name)
+ registerDataStreamInternal(name, dataStream.javaStream)
+ }
+
+ /**
+ * Registers the given [[DataStream]] as table with specified field names in the
+ * [[TableEnvironment]]'s catalog.
+ * Registered tables can be referenced in SQL queries.
+ *
+ * Example:
+ *
+ * {{{
+ * val set: DataStream[(String, Long)] = ...
+ * tableEnv.registerDataStream("myTable", set, 'a, 'b)
+ * }}}
+ *
+ * @param name The name under which the [[DataStream]] is registered in the catalog.
+ * @param dataStream The [[DataStream]] to register.
+ * @param fields The field names of the registered table.
+ * @tparam T The type of the [[DataStream]] to register.
+ */
+ def registerDataStream[T](name: String, dataStream: DataStream[T], fields: Expression*): Unit = {
+
+ checkValidTableName(name)
+ registerDataStreamInternal(name, dataStream.javaStream, fields.toArray)
+ }
+
+ /**
+ * Converts the given [[Table]] into a [[DataStream]] of a specified type.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * types: Fields are mapped by position, field types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
+ *
+ * @param table The [[Table]] to convert.
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ */
+ def toDataStream[T: TypeInformation](table: Table): DataStream[T] = {
+ asScalaStream(translate(table))
+ }
+
+ /**
+ * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
+ * Registered functions can be referenced in SQL queries.
+ *
+ * @param name The name under which the function is registered.
+ * @param tf The TableFunction to register
+ */
+ def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
+ registerTableFunctionInternal(name, tf)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
new file mode 100644
index 0000000..2a0d571
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.{Table, TableException}
+import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv}
+import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnv}
+
+/**
+ * Holds methods to convert a [[Table]] into a [[DataSet]] or a [[DataStream]].
+ *
+ * @param table The table to convert.
+ */
+class TableConversions(table: Table) {
+
+ /** Converts the [[Table]] to a [[DataSet]] of the specified type. */
+ def toDataSet[T: TypeInformation]: DataSet[T] = {
+
+ table.tableEnv match {
+ case tEnv: ScalaBatchTableEnv =>
+ tEnv.toDataSet(table)
+ case _ =>
+ throw new TableException(
+ "Only tables that originate from Scala DataSets can be converted to Scala DataSets.")
+ }
+ }
+
+ /** Converts the [[Table]] to a [[DataStream]] of the specified type. */
+ def toDataStream[T: TypeInformation]: DataStream[T] = {
+
+ table.tableEnv match {
+ case tEnv: ScalaStreamTableEnv =>
+ tEnv.toDataStream(table)
+ case _ =>
+ throw new TableException(
+ "Only tables that originate from Scala DataStreams " +
+ "can be converted to Scala DataStreams.")
+ }
+ }
+
+}
+