You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:58 UTC

[29/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TimeIntervalTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TimeIntervalTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TimeIntervalTypeInfo.scala
deleted file mode 100644
index b516745..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TimeIntervalTypeInfo.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-import java.util.Objects
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.base.{IntComparator, IntSerializer, LongComparator, LongSerializer}
-import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
-import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo.instantiateComparator
-import org.apache.flink.util.Preconditions._
-
-/**
-  * TypeInformation for SQL INTERVAL types.
-  */
-@SerialVersionUID(-1816179424364825258L)
-class TimeIntervalTypeInfo[T](
-    val clazz: Class[T],
-    val serializer: TypeSerializer[T],
-    val comparatorClass: Class[_ <: TypeComparator[T]])
-  extends TypeInformation[T]
-  with AtomicType[T] {
-
-  checkNotNull(clazz)
-  checkNotNull(serializer)
-  checkNotNull(comparatorClass)
-
-  override def isBasicType: Boolean = false
-
-  override def isTupleType: Boolean = false
-
-  override def getArity: Int = 1
-
-  override def getTotalFields: Int = 1
-
-  override def getTypeClass: Class[T] = clazz
-
-  override def isKeyType: Boolean = true
-
-  override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = serializer
-
-  override def createComparator(
-      sortOrderAscending: Boolean,
-      executionConfig: ExecutionConfig)
-    : TypeComparator[T] = instantiateComparator(comparatorClass, sortOrderAscending)
-
-  // ----------------------------------------------------------------------------------------------
-
-  override def hashCode: Int = Objects.hash(clazz, serializer, comparatorClass)
-
-  def canEqual(obj: Any): Boolean = obj.isInstanceOf[TimeIntervalTypeInfo[_]]
-
-  override def equals(obj: Any): Boolean = {
-    obj match {
-      case other: TimeIntervalTypeInfo[_] =>
-        other.canEqual(this) &&
-          (this.clazz eq other.clazz) &&
-          serializer == other.serializer &&
-          (this.comparatorClass eq other.comparatorClass)
-      case _ =>
-        false
-    }
-  }
-
-  override def toString: String = s"TimeIntervalTypeInfo(${clazz.getSimpleName})"
-}
-
-object TimeIntervalTypeInfo {
-
-  val INTERVAL_MONTHS = new TimeIntervalTypeInfo(
-    classOf[java.lang.Integer],
-    IntSerializer.INSTANCE,
-    classOf[IntComparator])
-
-  val INTERVAL_MILLIS = new TimeIntervalTypeInfo(
-    classOf[java.lang.Long],
-    LongSerializer.INSTANCE,
-    classOf[LongComparator])
-
-  // ----------------------------------------------------------------------------------------------
-
-  private def instantiateComparator[X](
-      comparatorClass: Class[_ <: TypeComparator[X]],
-      ascendingOrder: java.lang.Boolean)
-    : TypeComparator[X] = {
-    try {
-      val constructor = comparatorClass.getConstructor(java.lang.Boolean.TYPE)
-      constructor.newInstance(ascendingOrder)
-    } catch {
-      case e: Exception =>
-        throw new RuntimeException(
-          s"Could not initialize comparator ${comparatorClass.getName}", e)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
deleted file mode 100644
index e30e273..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{BIG_DEC_TYPE_INFO, BOOLEAN_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO}
-import org.apache.flink.api.common.typeinfo._
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
-import org.apache.flink.api.table.validate._
-
-object TypeCheckUtils {
-
-  /**
-    * Checks if type information is an advanced type that can be converted to a
-    * SQL type but NOT vice versa.
-    */
-  def isAdvanced(dataType: TypeInformation[_]): Boolean = dataType match {
-    case _: BasicTypeInfo[_] => false
-    case _: SqlTimeTypeInfo[_] => false
-    case _: TimeIntervalTypeInfo[_] => false
-    case _ => true
-  }
-
-  /**
-    * Checks if type information is a simple type that can be converted to a
-    * SQL type and vice versa.
-    */
-  def isSimple(dataType: TypeInformation[_]): Boolean = !isAdvanced(dataType)
-
-  def isNumeric(dataType: TypeInformation[_]): Boolean = dataType match {
-    case _: NumericTypeInfo[_] => true
-    case BIG_DEC_TYPE_INFO => true
-    case _ => false
-  }
-
-  def isTemporal(dataType: TypeInformation[_]): Boolean =
-    isTimePoint(dataType) || isTimeInterval(dataType)
-
-  def isTimePoint(dataType: TypeInformation[_]): Boolean =
-    dataType.isInstanceOf[SqlTimeTypeInfo[_]]
-
-  def isTimeInterval(dataType: TypeInformation[_]): Boolean =
-    dataType.isInstanceOf[TimeIntervalTypeInfo[_]]
-
-  def isString(dataType: TypeInformation[_]): Boolean = dataType == STRING_TYPE_INFO
-
-  def isBoolean(dataType: TypeInformation[_]): Boolean = dataType == BOOLEAN_TYPE_INFO
-
-  def isDecimal(dataType: TypeInformation[_]): Boolean = dataType == BIG_DEC_TYPE_INFO
-
-  def isInteger(dataType: TypeInformation[_]): Boolean = dataType == INT_TYPE_INFO
-
-  def isArray(dataType: TypeInformation[_]): Boolean = dataType match {
-    case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => true
-    case _ => false
-  }
-
-  def isComparable(dataType: TypeInformation[_]): Boolean =
-    classOf[Comparable[_]].isAssignableFrom(dataType.getTypeClass) && !isArray(dataType)
-
-  def assertNumericExpr(
-      dataType: TypeInformation[_],
-      caller: String)
-    : ValidationResult = dataType match {
-    case _: NumericTypeInfo[_] =>
-      ValidationSuccess
-    case BIG_DEC_TYPE_INFO =>
-      ValidationSuccess
-    case _ =>
-      ValidationFailure(s"$caller requires numeric types, get $dataType here")
-  }
-
-  def assertOrderableExpr(dataType: TypeInformation[_], caller: String): ValidationResult = {
-    if (dataType.isSortKeyType) {
-      ValidationSuccess
-    } else {
-      ValidationFailure(s"$caller requires orderable types, get $dataType here")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
deleted file mode 100644
index 23154a5..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, NumericTypeInfo, TypeInformation}
-
-/**
-  * Utilities for type conversions.
-  */
-object TypeCoercion {
-
-  val numericWideningPrecedence: IndexedSeq[TypeInformation[_]] =
-    IndexedSeq(
-      BYTE_TYPE_INFO,
-      SHORT_TYPE_INFO,
-      INT_TYPE_INFO,
-      LONG_TYPE_INFO,
-      FLOAT_TYPE_INFO,
-      DOUBLE_TYPE_INFO)
-
-  def widerTypeOf(tp1: TypeInformation[_], tp2: TypeInformation[_]): Option[TypeInformation[_]] = {
-    (tp1, tp2) match {
-      case (ti1, ti2) if ti1 == ti2 => Some(ti1)
-
-      case (_, STRING_TYPE_INFO) => Some(STRING_TYPE_INFO)
-      case (STRING_TYPE_INFO, _) => Some(STRING_TYPE_INFO)
-
-      case (_, BIG_DEC_TYPE_INFO) => Some(BIG_DEC_TYPE_INFO)
-      case (BIG_DEC_TYPE_INFO, _) => Some(BIG_DEC_TYPE_INFO)
-
-      case (stti: SqlTimeTypeInfo[_], _: TimeIntervalTypeInfo[_]) => Some(stti)
-      case (_: TimeIntervalTypeInfo[_], stti: SqlTimeTypeInfo[_]) => Some(stti)
-
-      case tuple if tuple.productIterator.forall(numericWideningPrecedence.contains) =>
-        val higherIndex = numericWideningPrecedence.lastIndexWhere(t => t == tp1 || t == tp2)
-        Some(numericWideningPrecedence(higherIndex))
-
-      case _ => None
-    }
-  }
-
-  /**
-    * Test if we can do cast safely without lose of information.
-    */
-  def canSafelyCast(from: TypeInformation[_], to: TypeInformation[_]): Boolean = (from, to) match {
-    case (_, STRING_TYPE_INFO) => true
-
-    case (_: NumericTypeInfo[_], BIG_DEC_TYPE_INFO) => true
-
-    case tuple if tuple.productIterator.forall(numericWideningPrecedence.contains) =>
-      if (numericWideningPrecedence.indexOf(from) < numericWideningPrecedence.indexOf(to)) {
-        true
-      } else {
-        false
-      }
-
-    case _ => false
-  }
-
-  /**
-    * All the supported cast types in flink-table.
-    * Note: This may lose information during the cast.
-    */
-  def canCast(from: TypeInformation[_], to: TypeInformation[_]): Boolean = (from, to) match {
-    case (fromTp, toTp) if fromTp == toTp => true
-
-    case (_, STRING_TYPE_INFO) => true
-
-    case (_, CHAR_TYPE_INFO) => false // Character type not supported.
-
-    case (STRING_TYPE_INFO, _: NumericTypeInfo[_]) => true
-    case (STRING_TYPE_INFO, BOOLEAN_TYPE_INFO) => true
-    case (STRING_TYPE_INFO, BIG_DEC_TYPE_INFO) => true
-    case (STRING_TYPE_INFO, SqlTimeTypeInfo.DATE) => true
-    case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIME) => true
-    case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) => true
-
-    case (BOOLEAN_TYPE_INFO, _: NumericTypeInfo[_]) => true
-    case (BOOLEAN_TYPE_INFO, BIG_DEC_TYPE_INFO) => true
-    case (_: NumericTypeInfo[_], BOOLEAN_TYPE_INFO) => true
-    case (BIG_DEC_TYPE_INFO, BOOLEAN_TYPE_INFO) => true
-
-    case (_: NumericTypeInfo[_], _: NumericTypeInfo[_]) => true
-    case (BIG_DEC_TYPE_INFO, _: NumericTypeInfo[_]) => true
-    case (_: NumericTypeInfo[_], BIG_DEC_TYPE_INFO) => true
-    case (INT_TYPE_INFO, SqlTimeTypeInfo.DATE) => true
-    case (INT_TYPE_INFO, SqlTimeTypeInfo.TIME) => true
-    case (LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) => true
-    case (INT_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MONTHS) => true
-    case (LONG_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MILLIS) => true
-
-    case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIME) => false
-    case (SqlTimeTypeInfo.TIME, SqlTimeTypeInfo.DATE) => false
-    case (_: SqlTimeTypeInfo[_], _: SqlTimeTypeInfo[_]) => true
-    case (SqlTimeTypeInfo.DATE, INT_TYPE_INFO) => true
-    case (SqlTimeTypeInfo.TIME, INT_TYPE_INFO) => true
-    case (SqlTimeTypeInfo.TIMESTAMP, LONG_TYPE_INFO) => true
-
-    case (TimeIntervalTypeInfo.INTERVAL_MONTHS, INT_TYPE_INFO) => true
-    case (TimeIntervalTypeInfo.INTERVAL_MILLIS, LONG_TYPE_INFO) => true
-
-    case _ => false
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
deleted file mode 100644
index a81577c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rel.core.JoinRelType._
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
-import org.apache.flink.api.table.{FlinkTypeFactory, TableException}
-import org.apache.flink.types.Row
-
-import scala.collection.JavaConversions._
-
-object TypeConverter {
-
-  val DEFAULT_ROW_TYPE = new RowTypeInfo().asInstanceOf[TypeInformation[Any]]
-
-  /**
-    * Determines the return type of Flink operators based on the logical fields, the expected
-    * physical type and configuration parameters.
-    *
-    * For example:
-    *   - No physical type expected, only 3 non-null fields and efficient type usage enabled
-    *       -> return Tuple3
-    *   - No physical type expected, efficient type usage enabled, but 3 nullable fields
-    *       -> return Row because Tuple does not support null values
-    *   - Physical type expected
-    *       -> check if physical type is compatible and return it
-    *
-    * @param logicalRowType logical row information
-    * @param expectedPhysicalType expected physical type
-    * @param nullable fields can be nullable
-    * @param useEfficientTypes use the most efficient types (e.g. Tuples and value types)
-    * @return suitable return type
-    */
-  def determineReturnType(
-      logicalRowType: RelDataType,
-      expectedPhysicalType: Option[TypeInformation[Any]],
-      nullable: Boolean,
-      useEfficientTypes: Boolean)
-    : TypeInformation[Any] = {
-    // convert to type information
-    val logicalFieldTypes = logicalRowType.getFieldList map { relDataType =>
-      FlinkTypeFactory.toTypeInfo(relDataType.getType)
-    }
-    // field names
-    val logicalFieldNames = logicalRowType.getFieldNames.toList
-
-    val returnType = expectedPhysicalType match {
-      // a certain physical type is expected (but not Row)
-      // check if expected physical type is compatible with logical field type
-      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
-        if (typeInfo.getArity != logicalFieldTypes.length) {
-          throw new TableException("Arity of result does not match expected type.")
-        }
-        typeInfo match {
-
-          // POJO type expected
-          case pt: PojoTypeInfo[_] =>
-            logicalFieldNames.zip(logicalFieldTypes) foreach {
-              case (fName, fType) =>
-                val pojoIdx = pt.getFieldIndex(fName)
-                if (pojoIdx < 0) {
-                  throw new TableException(s"POJO does not define field name: $fName")
-                }
-                val expectedTypeInfo = pt.getTypeAt(pojoIdx)
-                if (fType != expectedTypeInfo) {
-                  throw new TableException(s"Result field does not match expected type. " +
-                    s"Expected: $expectedTypeInfo; Actual: $fType")
-                }
-            }
-
-          // Tuple/Case class type expected
-          case ct: CompositeType[_] =>
-            logicalFieldTypes.zipWithIndex foreach {
-              case (fieldTypeInfo, i) =>
-                val expectedTypeInfo = ct.getTypeAt(i)
-                if (fieldTypeInfo != expectedTypeInfo) {
-                  throw new TableException(s"Result field does not match expected type. " +
-                    s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo")
-                }
-            }
-
-          // Atomic type expected
-          case at: AtomicType[_] =>
-            val fieldTypeInfo = logicalFieldTypes.head
-            if (fieldTypeInfo != at) {
-              throw new TableException(s"Result field does not match expected type. " +
-                s"Expected: $at; Actual: $fieldTypeInfo")
-            }
-
-          case _ =>
-            throw new TableException("Unsupported result type.")
-        }
-        typeInfo
-
-      // Row is expected, create the arity for it
-      case Some(typeInfo) if typeInfo.getTypeClass == classOf[Row] =>
-        new RowTypeInfo(logicalFieldTypes: _*)
-
-      // no physical type
-      // determine type based on logical fields and configuration parameters
-      case None =>
-        // no need for efficient types -> use Row
-        // we cannot use efficient types if row arity > tuple arity or nullable
-        if (!useEfficientTypes || logicalFieldTypes.length > Tuple.MAX_ARITY || nullable) {
-          new RowTypeInfo(logicalFieldTypes: _*)
-        }
-        // use efficient type tuple or atomic type
-        else {
-          if (logicalFieldTypes.length == 1) {
-            logicalFieldTypes.head
-          }
-          else {
-            new TupleTypeInfo[Tuple](logicalFieldTypes.toArray:_*)
-          }
-        }
-    }
-    returnType.asInstanceOf[TypeInformation[Any]]
-  }
-
-  def sqlJoinTypeToFlinkJoinType(sqlJoinType: JoinRelType): JoinType = sqlJoinType match {
-    case INNER => JoinType.INNER
-    case LEFT => JoinType.LEFT_OUTER
-    case RIGHT => JoinType.RIGHT_OUTER
-    case FULL => JoinType.FULL_OUTER
-  }
-
-  def flinkJoinTypeToRelType(joinType: JoinType) = joinType match {
-    case JoinType.INNER => JoinRelType.INNER
-    case JoinType.LEFT_OUTER => JoinRelType.LEFT
-    case JoinType.RIGHT_OUTER => JoinRelType.RIGHT
-    case JoinType.FULL_OUTER => JoinRelType.FULL
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
deleted file mode 100644
index 8e409cc..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.validate
-
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable, ReflectiveSqlOperatorTable}
-import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable}
-import org.apache.flink.api.table.ValidationException
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.functions.{ScalarFunction, TableFunction}
-import org.apache.flink.api.table.functions.utils.{TableSqlFunction, UserDefinedFunctionUtils}
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.util.{Failure, Success, Try}
-
-/**
-  * A catalog for looking up (user-defined) functions, used during validation phases
-  * of both Table API and SQL API.
-  */
-class FunctionCatalog {
-
-  private val functionBuilders = mutable.HashMap.empty[String, Class[_]]
-  private val sqlFunctions = mutable.ListBuffer[SqlFunction]()
-
-  def registerFunction(name: String, builder: Class[_]): Unit =
-    functionBuilders.put(name.toLowerCase, builder)
-
-  def registerSqlFunction(sqlFunction: SqlFunction): Unit = {
-    sqlFunctions --= sqlFunctions.filter(_.getName == sqlFunction.getName)
-    sqlFunctions += sqlFunction
-  }
-
-  /**
-    * Register multiple SQL functions at the same time. The functions have the same name.
-    */
-  def registerSqlFunctions(functions: Seq[SqlFunction]): Unit = {
-    if (functions.nonEmpty) {
-      val name = functions.head.getName
-      // check that all functions have the same name
-      if (functions.forall(_.getName == name)) {
-        sqlFunctions --= sqlFunctions.filter(_.getName == name)
-        sqlFunctions ++= functions
-      } else {
-        throw ValidationException("The SQL functions to be registered have different names.")
-      }
-    }
-  }
-
-  def getSqlOperatorTable: SqlOperatorTable =
-    ChainedSqlOperatorTable.of(
-      new BasicOperatorTable(),
-      new ListSqlOperatorTable(sqlFunctions)
-    )
-
-  /**
-    * Lookup and create an expression if we find a match.
-    */
-  def lookupFunction(name: String, children: Seq[Expression]): Expression = {
-    val funcClass = functionBuilders
-      .getOrElse(name.toLowerCase, throw ValidationException(s"Undefined function: $name"))
-
-    // Instantiate a function using the provided `children`
-    funcClass match {
-
-      // user-defined scalar function call
-      case sf if classOf[ScalarFunction].isAssignableFrom(sf) =>
-        Try(UserDefinedFunctionUtils.instantiate(sf.asInstanceOf[Class[ScalarFunction]])) match {
-          case Success(scalarFunction) => ScalarFunctionCall(scalarFunction, children)
-          case Failure(e) => throw ValidationException(e.getMessage)
-        }
-
-      // user-defined table function call
-      case tf if classOf[TableFunction[_]].isAssignableFrom(tf) =>
-        val tableSqlFunction = sqlFunctions
-          .find(f => f.getName.equalsIgnoreCase(name) && f.isInstanceOf[TableSqlFunction])
-          .getOrElse(throw ValidationException(s"Undefined table function: $name"))
-          .asInstanceOf[TableSqlFunction]
-        val typeInfo = tableSqlFunction.getRowTypeInfo
-        val function = tableSqlFunction.getTableFunction
-        TableFunctionCall(name, function, children, typeInfo)
-
-      // general expression call
-      case expression if classOf[Expression].isAssignableFrom(expression) =>
-        // try to find a constructor accepts `Seq[Expression]`
-        Try(funcClass.getDeclaredConstructor(classOf[Seq[_]])) match {
-          case Success(seqCtor) =>
-            Try(seqCtor.newInstance(children).asInstanceOf[Expression]) match {
-              case Success(expr) => expr
-              case Failure(e) => throw new ValidationException(e.getMessage)
-            }
-          case Failure(e) =>
-            val childrenClass = Seq.fill(children.length)(classOf[Expression])
-            // try to find a constructor matching the exact number of children
-            Try(funcClass.getDeclaredConstructor(childrenClass: _*)) match {
-              case Success(ctor) =>
-                Try(ctor.newInstance(children: _*).asInstanceOf[Expression]) match {
-                  case Success(expr) => expr
-                  case Failure(exception) => throw ValidationException(exception.getMessage)
-                }
-              case Failure(exception) =>
-                throw ValidationException(s"Invalid number of arguments for function $funcClass")
-            }
-        }
-
-      case _ =>
-        throw ValidationException("Unsupported function.")
-    }
-  }
-
-  /**
-    * Drop a function and return if the function existed.
-    */
-  def dropFunction(name: String): Boolean =
-    functionBuilders.remove(name.toLowerCase).isDefined
-
-  /**
-    * Drop all registered functions.
-    */
-  def clear(): Unit = functionBuilders.clear()
-}
-
-object FunctionCatalog {
-
-  val builtInFunctions: Map[String, Class[_]] = Map(
-    // logic
-    "isNull" -> classOf[IsNull],
-    "isNotNull" -> classOf[IsNotNull],
-    "isTrue" -> classOf[IsTrue],
-    "isFalse" -> classOf[IsFalse],
-    "isNotTrue" -> classOf[IsNotTrue],
-    "isNotFalse" -> classOf[IsNotFalse],
-
-    // aggregate functions
-    "avg" -> classOf[Avg],
-    "count" -> classOf[Count],
-    "max" -> classOf[Max],
-    "min" -> classOf[Min],
-    "sum" -> classOf[Sum],
-
-    // string functions
-    "charLength" -> classOf[CharLength],
-    "initCap" -> classOf[InitCap],
-    "like" -> classOf[Like],
-    "lowerCase" -> classOf[Lower],
-    "similar" -> classOf[Similar],
-    "substring" -> classOf[Substring],
-    "trim" -> classOf[Trim],
-    "upperCase" -> classOf[Upper],
-    "position" -> classOf[Position],
-    "overlay" -> classOf[Overlay],
-
-    // math functions
-    "abs" -> classOf[Abs],
-    "ceil" -> classOf[Ceil],
-    "exp" -> classOf[Exp],
-    "floor" -> classOf[Floor],
-    "log10" -> classOf[Log10],
-    "ln" -> classOf[Ln],
-    "power" -> classOf[Power],
-    "mod" -> classOf[Mod],
-    "sqrt" -> classOf[Sqrt],
-
-    // temporal functions
-    "extract" -> classOf[Extract],
-    "currentDate" -> classOf[CurrentDate],
-    "currentTime" -> classOf[CurrentTime],
-    "currentTimestamp" -> classOf[CurrentTimestamp],
-    "localTime" -> classOf[LocalTime],
-    "localTimestamp" -> classOf[LocalTimestamp],
-    "quarter" -> classOf[Quarter],
-    "temporalOverlaps" -> classOf[TemporalOverlaps],
-
-    // array
-    "cardinality" -> classOf[ArrayCardinality],
-    "at" -> classOf[ArrayElementAt],
-    "element" -> classOf[ArrayElement]
-
-    // TODO implement function overloading here
-    // "floor" -> classOf[TemporalFloor]
-    // "ceil" -> classOf[TemporalCeil]
-  )
-
-  /**
-    * Create a new function catalog with built-in functions.
-    */
-  def withBuiltIns: FunctionCatalog = {
-    val catalog = new FunctionCatalog()
-    builtInFunctions.foreach { case (n, c) => catalog.registerFunction(n, c) }
-    catalog
-  }
-}
-
-class BasicOperatorTable extends ReflectiveSqlOperatorTable {
-
-  /**
-    * List of supported SQL operators / functions.
-    *
-    * This list should be kept in sync with [[SqlStdOperatorTable]].
-    */
-  private val builtInSqlOperators: Seq[SqlOperator] = Seq(
-    // SET OPERATORS
-    SqlStdOperatorTable.UNION,
-    SqlStdOperatorTable.UNION_ALL,
-    SqlStdOperatorTable.EXCEPT,
-    SqlStdOperatorTable.EXCEPT_ALL,
-    SqlStdOperatorTable.INTERSECT,
-    SqlStdOperatorTable.INTERSECT_ALL,
-    // BINARY OPERATORS
-    SqlStdOperatorTable.AND,
-    SqlStdOperatorTable.AS,
-    SqlStdOperatorTable.CONCAT,
-    SqlStdOperatorTable.DIVIDE,
-    SqlStdOperatorTable.DIVIDE_INTEGER,
-    SqlStdOperatorTable.DOT,
-    SqlStdOperatorTable.EQUALS,
-    SqlStdOperatorTable.GREATER_THAN,
-    SqlStdOperatorTable.IS_DISTINCT_FROM,
-    SqlStdOperatorTable.IS_NOT_DISTINCT_FROM,
-    SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
-    SqlStdOperatorTable.LESS_THAN,
-    SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
-    SqlStdOperatorTable.MINUS,
-    SqlStdOperatorTable.MULTIPLY,
-    SqlStdOperatorTable.NOT_EQUALS,
-    SqlStdOperatorTable.OR,
-    SqlStdOperatorTable.PLUS,
-    SqlStdOperatorTable.DATETIME_PLUS,
-    // POSTFIX OPERATORS
-    SqlStdOperatorTable.DESC,
-    SqlStdOperatorTable.NULLS_FIRST,
-    SqlStdOperatorTable.IS_NOT_NULL,
-    SqlStdOperatorTable.IS_NULL,
-    SqlStdOperatorTable.IS_NOT_TRUE,
-    SqlStdOperatorTable.IS_TRUE,
-    SqlStdOperatorTable.IS_NOT_FALSE,
-    SqlStdOperatorTable.IS_FALSE,
-    SqlStdOperatorTable.IS_NOT_UNKNOWN,
-    SqlStdOperatorTable.IS_UNKNOWN,
-    // PREFIX OPERATORS
-    SqlStdOperatorTable.NOT,
-    SqlStdOperatorTable.UNARY_MINUS,
-    SqlStdOperatorTable.UNARY_PLUS,
-    // AGGREGATE OPERATORS
-    SqlStdOperatorTable.SUM,
-    SqlStdOperatorTable.COUNT,
-    SqlStdOperatorTable.MIN,
-    SqlStdOperatorTable.MAX,
-    SqlStdOperatorTable.AVG,
-    // ARRAY OPERATORS
-    SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR,
-    SqlStdOperatorTable.ITEM,
-    SqlStdOperatorTable.CARDINALITY,
-    SqlStdOperatorTable.ELEMENT,
-    // SPECIAL OPERATORS
-    SqlStdOperatorTable.ROW,
-    SqlStdOperatorTable.OVERLAPS,
-    SqlStdOperatorTable.LITERAL_CHAIN,
-    SqlStdOperatorTable.BETWEEN,
-    SqlStdOperatorTable.SYMMETRIC_BETWEEN,
-    SqlStdOperatorTable.NOT_BETWEEN,
-    SqlStdOperatorTable.SYMMETRIC_NOT_BETWEEN,
-    SqlStdOperatorTable.NOT_LIKE,
-    SqlStdOperatorTable.LIKE,
-    SqlStdOperatorTable.NOT_SIMILAR_TO,
-    SqlStdOperatorTable.SIMILAR_TO,
-    SqlStdOperatorTable.CASE,
-    SqlStdOperatorTable.REINTERPRET,
-    SqlStdOperatorTable.EXTRACT_DATE,
-    // FUNCTIONS
-    SqlStdOperatorTable.SUBSTRING,
-    SqlStdOperatorTable.OVERLAY,
-    SqlStdOperatorTable.TRIM,
-    SqlStdOperatorTable.POSITION,
-    SqlStdOperatorTable.CHAR_LENGTH,
-    SqlStdOperatorTable.CHARACTER_LENGTH,
-    SqlStdOperatorTable.UPPER,
-    SqlStdOperatorTable.LOWER,
-    SqlStdOperatorTable.INITCAP,
-    SqlStdOperatorTable.POWER,
-    SqlStdOperatorTable.SQRT,
-    SqlStdOperatorTable.MOD,
-    SqlStdOperatorTable.LN,
-    SqlStdOperatorTable.LOG10,
-    SqlStdOperatorTable.ABS,
-    SqlStdOperatorTable.EXP,
-    SqlStdOperatorTable.NULLIF,
-    SqlStdOperatorTable.COALESCE,
-    SqlStdOperatorTable.FLOOR,
-    SqlStdOperatorTable.CEIL,
-    SqlStdOperatorTable.LOCALTIME,
-    SqlStdOperatorTable.LOCALTIMESTAMP,
-    SqlStdOperatorTable.CURRENT_TIME,
-    SqlStdOperatorTable.CURRENT_TIMESTAMP,
-    SqlStdOperatorTable.CURRENT_DATE,
-    SqlStdOperatorTable.CAST,
-    SqlStdOperatorTable.EXTRACT,
-    SqlStdOperatorTable.QUARTER,
-    SqlStdOperatorTable.SCALAR_QUERY,
-    SqlStdOperatorTable.EXISTS
-  )
-
-  builtInSqlOperators.foreach(register)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ValidationResult.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ValidationResult.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ValidationResult.scala
deleted file mode 100644
index 5cc7d03..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/ValidationResult.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.validate
-
-/**
-  * Represents the result of a validation.
-  */
-sealed trait ValidationResult {
-  def isFailure: Boolean = !isSuccess
-  def isSuccess: Boolean
-
-  /**
-    * Allows constructing a cascade of validation results.
-    * The first failure result will be returned.
-    */
-  def orElse(other: ValidationResult): ValidationResult = {
-    if (isSuccess) {
-      other
-    } else {
-      this
-    }
-  }
-}
-
-/**
-  * Represents the successful result of a validation.
-  */
-object ValidationSuccess extends ValidationResult {
-  val isSuccess: Boolean = true
-}
-
-/**
-  * Represents the failing result of a validation,
-  * with a error message to show the reason of failure.
-  */
-case class ValidationFailure(message: String) extends ValidationResult {
-  val isSuccess: Boolean = false
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
deleted file mode 100644
index 5637d7a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
+++ /dev/null
@@ -1,390 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import org.apache.flink.api.table.expressions.{Expression, ExpressionParser}
-import org.apache.flink.api.table.plan.logical._
-
-/**
-  * A group-window specification.
-  *
-  * Group-windows group rows based on time or row-count intervals and is therefore essentially a
-  * special type of groupBy. Just like groupBy, group-windows allow to compute aggregates
-  * on groups of elements.
-  *
-  * Infinite streaming tables can only be grouped into time or row intervals. Hence window grouping
-  * is required to apply aggregations on streaming tables.
-  *
-  * For finite batch tables, group-windows provide shortcuts for time-based groupBy.
-  *
-  */
-trait GroupWindow {
-
-  /**
-    * Converts an API class to a logical window for planning.
-    */
-  private[flink] def toLogicalWindow: LogicalWindow
-}
-
-/**
-  * A group-window operating on event-time.
-  *
-  * @param timeField defines the time mode for streaming tables. For batch table it defines the
-  *                  time attribute on which is grouped.
-  */
-abstract class EventTimeWindow(val timeField: Expression) extends GroupWindow {
-
-  protected var name: Option[Expression] = None
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause can refer to in order
-    * to access window properties such as window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: Expression): EventTimeWindow = {
-    this.name = Some(alias)
-    this
-  }
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause can refer to in order
-    * to access window properties such as window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: String): EventTimeWindow = as(ExpressionParser.parseExpression(alias))
-}
-
-// ------------------------------------------------------------------------------------------------
-// Tumbling group-windows
-// ------------------------------------------------------------------------------------------------
-
-/**
-  * Tumbling group-window.
-  *
-  * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are
-  * grouped by processing-time.
-  *
-  * @param size the size of the window either as time or row-count interval.
-  */
-class TumblingWindow(size: Expression) extends GroupWindow {
-
-  /**
-    * Tumbling group-window.
-    *
-    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
-    * are grouped by processing-time.
-    *
-    * @param size the size of the window either as time or row-count interval.
-    */
-  def this(size: String) = this(ExpressionParser.parseExpression(size))
-
-  private var alias: Option[Expression] = None
-
-  /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
-    * are grouped by processing-time.
-    *
-    * For batch tables, refer to a timestamp or long attribute.
-    *
-    * @param timeField time mode for streaming tables and time attribute for batch tables
-    * @return a tumbling group-window on event-time
-    */
-  def on(timeField: Expression): TumblingEventTimeWindow =
-    new TumblingEventTimeWindow(alias, timeField, size)
-
-  /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
-    * are grouped by processing-time.
-    *
-    * For batch tables, refer to a timestamp or long attribute.
-    *
-    * @param timeField time mode for streaming tables and time attribute for batch tables
-    * @return a tumbling group-window on event-time
-    */
-  def on(timeField: String): TumblingEventTimeWindow =
-    on(ExpressionParser.parseExpression(timeField))
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause can refer to in order
-    * to access window properties such as window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: Expression): TumblingWindow = {
-    this.alias = Some(alias)
-    this
-  }
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause can refer to in order
-    * to access window properties such as window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: String): TumblingWindow = as(ExpressionParser.parseExpression(alias))
-
-  override private[flink] def toLogicalWindow: LogicalWindow =
-    ProcessingTimeTumblingGroupWindow(alias, size)
-}
-
-/**
-  * Tumbling group-window on event-time.
-  */
-class TumblingEventTimeWindow(
-    alias: Option[Expression],
-    time: Expression,
-    size: Expression)
-  extends EventTimeWindow(time) {
-
-  override private[flink] def toLogicalWindow: LogicalWindow =
-    EventTimeTumblingGroupWindow(name.orElse(alias), time, size)
-}
-
-// ------------------------------------------------------------------------------------------------
-// Sliding group windows
-// ------------------------------------------------------------------------------------------------
-
-/**
-  * Partially specified sliding window.
-  *
-  * @param size the size of the window either as time or row-count interval.
-  */
-class SlideWithSize(size: Expression) {
-
-  /**
-    * Partially specified sliding window.
-    *
-    * @param size the size of the window either as time or row-count interval.
-    */
-  def this(size: String) = this(ExpressionParser.parseExpression(size))
-
-  /**
-    * Specifies the window's slide as time or row-count interval.
-    *
-    * The slide determines the interval in which windows are started. Hence, sliding windows can
-    * overlap if the slide is smaller than the size of the window.
-    *
-    * For example, you could have windows of size 15 minutes that slide by 3 minutes. With this
-    * 15 minutes worth of elements are grouped every 3 minutes and each row contributes to 5
-    * windows.
-    *
-    * @param slide the slide of the window either as time or row-count interval.
-    * @return a sliding group-window
-    */
-  def every(slide: Expression): SlidingWindow = new SlidingWindow(size, slide)
-
-  /**
-    * Specifies the window's slide as time or row-count interval.
-    *
-    * The slide determines the interval in which windows are started. Hence, sliding windows can
-    * overlap if the slide is smaller than the size of the window.
-    *
-    * For example, you could have windows of size 15 minutes that slide by 3 minutes. With this
-    * 15 minutes worth of elements are grouped every 3 minutes and each row contributes to 5
-    * windows.
-    *
-    * @param slide the slide of the window either as time or row-count interval.
-    * @return a sliding group-window
-    */
-  def every(slide: String): SlidingWindow = every(ExpressionParser.parseExpression(slide))
-}
-
-/**
-  * Sliding group-window.
-  *
-  * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are
-  * grouped by processing-time.
-  *
-  * @param size the size of the window either as time or row-count interval.
-  */
-class SlidingWindow(
-    size: Expression,
-    slide: Expression)
-  extends GroupWindow {
-
-  private var alias: Option[Expression] = None
-
-  /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
-    * are grouped by processing-time.
-    *
-    * For batch tables, refer to a timestamp or long attribute.
-    *
-    * @param timeField time mode for streaming tables and time attribute for batch tables
-    * @return a sliding group-window on event-time
-    */
-  def on(timeField: Expression): SlidingEventTimeWindow =
-    new SlidingEventTimeWindow(alias, timeField, size, slide)
-
-  /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
-    * are grouped by processing-time.
-    *
-    * For batch tables, refer to a timestamp or long attribute.
-    *
-    * @param timeField time mode for streaming tables and time attribute for batch tables
-    * @return a sliding group-window on event-time
-    */
-  def on(timeField: String): SlidingEventTimeWindow =
-    on(ExpressionParser.parseExpression(timeField))
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause can refer to in order
-    * to access window properties such as window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: Expression): SlidingWindow = {
-    this.alias = Some(alias)
-    this
-  }
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause can refer to in order
-    * to access window properties such as window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: String): SlidingWindow = as(ExpressionParser.parseExpression(alias))
-
-  override private[flink] def toLogicalWindow: LogicalWindow =
-    ProcessingTimeSlidingGroupWindow(alias, size, slide)
-}
-
-/**
-  * Sliding group-window on event-time.
-  */
-class SlidingEventTimeWindow(
-    alias: Option[Expression],
-    timeField: Expression,
-    size: Expression,
-    slide: Expression)
-  extends EventTimeWindow(timeField) {
-
-  override private[flink] def toLogicalWindow: LogicalWindow =
-    EventTimeSlidingGroupWindow(name.orElse(alias), timeField, size, slide)
-}
-
-// ------------------------------------------------------------------------------------------------
-// Session group windows
-// ------------------------------------------------------------------------------------------------
-
-/**
-  * Session group-window.
-  *
-  * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows are
-  * grouped by processing-time.
-  *
-  * @param gap the time interval of inactivity before a window is closed.
-  */
-class SessionWindow(gap: Expression) extends GroupWindow {
-
-  /**
-    * Session group-window.
-    *
-    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
-    * are grouped by processing-time.
-    *
-    * @param gap the time interval of inactivity before a window is closed.
-    */
-  def this(gap: String) = this(ExpressionParser.parseExpression(gap))
-
-  private var alias: Option[Expression] = None
-
-  /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
-    * are grouped by processing-time.
-    *
-    * For batch tables, refer to a timestamp or long attribute.
-    *
-    * @param timeField time mode for streaming tables and time attribute for batch tables
-    * @return a session group-window on event-time
-    */
-  def on(timeField: Expression): SessionEventTimeWindow =
-    new SessionEventTimeWindow(alias, timeField, gap)
-
-  /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows
-    * are grouped by processing-time.
-    *
-    * For batch tables, refer to a timestamp or long attribute.
-    *
-    * @param timeField time mode for streaming tables and time attribute for batch tables
-    * @return a session group-window on event-time
-    */
-  def on(timeField: String): SessionEventTimeWindow =
-    on(ExpressionParser.parseExpression(timeField))
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause can refer to in order
-    * to access window properties such as window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: Expression): SessionWindow = {
-    this.alias = Some(alias)
-    this
-  }
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause can refer to in order
-    * to access window properties such as window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: String): SessionWindow = as(ExpressionParser.parseExpression(alias))
-
-  override private[flink] def toLogicalWindow: LogicalWindow =
-    ProcessingTimeSessionGroupWindow(alias, gap)
-}
-
-/**
-  * Session group-window on event-time.
-  */
-class SessionEventTimeWindow(
-    alias: Option[Expression],
-    timeField: Expression,
-    gap: Expression)
-  extends EventTimeWindow(timeField) {
-
-  override private[flink] def toLogicalWindow: LogicalWindow =
-    EventTimeSessionGroupWindow(name.orElse(alias), timeField, gap)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
deleted file mode 100644
index 63a5413..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-
-/**
-  * Simple example for demonstrating the use of SQL on a Stream Table.
-  *
-  * This example shows how to:
-  *  - Convert DataStreams to Tables
-  *  - Register a Table under a name
-  *  - Run a StreamSQL query on the registered Table
-  *
-  */
-object StreamSQLExample {
-
-  // *************************************************************************
-  //     PROGRAM
-  // *************************************************************************
-
-  def main(args: Array[String]): Unit = {
-
-    // set up execution environment
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val orderA: DataStream[Order] = env.fromCollection(Seq(
-      Order(1L, "beer", 3),
-      Order(1L, "diaper", 4),
-      Order(3L, "rubber", 2)))
-
-    val orderB: DataStream[Order] = env.fromCollection(Seq(
-      Order(2L, "pen", 3),
-      Order(2L, "rubber", 3),
-      Order(4L, "beer", 1)))
-
-    // register the DataStreams under the name "OrderA" and "OrderB"
-    tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount)
-    tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
-
-    // union the two tables
-    val result = tEnv.sql(
-      "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " +
-        "SELECT * FROM OrderB WHERE amount < 2")
-
-    result.toDataStream[Order].print()
-
-    env.execute()
-  }
-
-  // *************************************************************************
-  //     USER DATA TYPES
-  // *************************************************************************
-
-  case class Order(user: Long, product: String, amount: Int)
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
deleted file mode 100644
index 2ce2684..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamTableExample.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-
-/**
-  * Simple example for demonstrating the use of Table API on a Stream Table.
-  *
-  * This example shows how to:
-  *  - Convert DataStreams to Tables
-  *  - Apply union, select, and filter operations
-  *
-  */
-object StreamTableExample {
-
-  // *************************************************************************
-  //     PROGRAM
-  // *************************************************************************
-
-  def main(args: Array[String]): Unit = {
-
-    // set up execution environment
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val orderA = env.fromCollection(Seq(
-      Order(1L, "beer", 3),
-      Order(1L, "diaper", 4),
-      Order(3L, "rubber", 2))).toTable(tEnv)
-
-    val orderB = env.fromCollection(Seq(
-      Order(2L, "pen", 3),
-      Order(2L, "rubber", 3),
-      Order(4L, "beer", 1))).toTable(tEnv)
-
-    // union the two tables
-    val result: DataStream[Order] = orderA.unionAll(orderB)
-      .select('user, 'product, 'amount)
-      .where('amount > 2)
-      .toDataStream[Order]
-
-    result.print()
-
-    env.execute()
-  }
-
-  // *************************************************************************
-  //     USER DATA TYPES
-  // *************************************************************************
-
-  case class Order(user: Long, product: String, amount: Int)
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
deleted file mode 100644
index a950988..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.TableEnvironment
-
-/**
-  * This program implements a modified version of the TPC-H query 3. The
-  * example demonstrates how to assign names to fields by extending the Tuple class.
-  * The original query can be found at
-  * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
-  * (page 29).
-  *
-  * This program implements the following SQL equivalent:
-  *
-  * {{{
-  * SELECT
-  *      l_orderkey,
-  *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
-  *      o_orderdate,
-  *      o_shippriority
-  * FROM customer,
-  *      orders,
-  *      lineitem
-  * WHERE
-  *      c_mktsegment = '[SEGMENT]'
-  *      AND c_custkey = o_custkey
-  *      AND l_orderkey = o_orderkey
-  *      AND o_orderdate < date '[DATE]'
-  *      AND l_shipdate > date '[DATE]'
-  * GROUP BY
-  *      l_orderkey,
-  *      o_orderdate,
-  *      o_shippriority
-  * ORDER BY
-  *      revenue desc,
-  *      o_orderdate;
-  * }}}
-  *
-  * Input files are plain text CSV files using the pipe character ('|') as field separator
-  * as generated by the TPC-H data generator which is available at
-  * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
-  *
-  * Usage:
-  * {{{
-  * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>
-  * }}}
-  *
-  * This example shows how to:
-  *  - Convert DataSets to Tables
-  *  - Use Table API expressions
-  *
-  */
-object TPCHQuery3Table {
-
-  // *************************************************************************
-  //     PROGRAM
-  // *************************************************************************
-
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    // set filter date
-    val date = "1995-03-12".toDate
-
-    // get execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val lineitems = getLineitemDataSet(env)
-      .toTable(tEnv, 'id, 'extdPrice, 'discount, 'shipDate)
-      .filter('shipDate.toDate > date)
-
-    val customers = getCustomerDataSet(env)
-      .toTable(tEnv, 'id, 'mktSegment)
-      .filter('mktSegment === "AUTOMOBILE")
-
-    val orders = getOrdersDataSet(env)
-      .toTable(tEnv, 'orderId, 'custId, 'orderDate, 'shipPrio)
-      .filter('orderDate.toDate < date)
-
-    val items =
-      orders.join(customers)
-        .where('custId === 'id)
-        .select('orderId, 'orderDate, 'shipPrio)
-      .join(lineitems)
-        .where('orderId === 'id)
-        .select(
-          'orderId,
-          'extdPrice * (1.0f.toExpr - 'discount) as 'revenue,
-          'orderDate,
-          'shipPrio)
-
-    val result = items
-      .groupBy('orderId, 'orderDate, 'shipPrio)
-      .select('orderId, 'revenue.sum as 'revenue, 'orderDate, 'shipPrio)
-      .orderBy('revenue.desc, 'orderDate.asc)
-
-    // emit result
-    result.writeAsCsv(outputPath, "\n", "|")
-
-    // execute program
-    env.execute("Scala TPCH Query 3 (Table API Expression) Example")
-  }
-  
-  // *************************************************************************
-  //     USER DATA TYPES
-  // *************************************************************************
-  
-  case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String)
-  case class Customer(id: Long, mktSegment: String)
-  case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long)
-
-  // *************************************************************************
-  //     UTIL METHODS
-  // *************************************************************************
-  
-  private var lineitemPath: String = _
-  private var customerPath: String = _
-  private var ordersPath: String = _
-  private var outputPath: String = _
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length == 4) {
-      lineitemPath = args(0)
-      customerPath = args(1)
-      ordersPath = args(2)
-      outputPath = args(3)
-      true
-    } else {
-      System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
-          " Due to legal restrictions, we can not ship generated data.\n" +
-          " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
-          " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> " +
-                             "<orders-csv path> <result path>")
-      false
-    }
-  }
-
-  private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = {
-    env.readCsvFile[Lineitem](
-        lineitemPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 5, 6, 10) )
-  }
-
-  private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = {
-    env.readCsvFile[Customer](
-        customerPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 6) )
-  }
-
-  private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
-    env.readCsvFile[Order](
-        ordersPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 1, 4, 7) )
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
deleted file mode 100644
index 96a603e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountSQL.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.TableEnvironment
-
-/**
-  * Simple example that shows how the Batch SQL API is used in Scala.
-  *
-  * This example shows how to:
-  *  - Convert DataSets to Tables
-  *  - Register a Table under a name
-  *  - Run a SQL query on the registered Table
-  *
-  */
-object WordCountSQL {
-
-  // *************************************************************************
-  //     PROGRAM
-  // *************************************************************************
-
-  def main(args: Array[String]): Unit = {
-
-    // set up execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
-
-    // register the DataSet as table "WordCount"
-    tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
-
-    // run a SQL query on the Table and retrieve the result as a new Table
-    val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
-
-    table.toDataSet[WC].print()
-  }
-
-  // *************************************************************************
-  //     USER DATA TYPES
-  // *************************************************************************
-
-  case class WC(word: String, frequency: Long)
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
deleted file mode 100644
index 587a716..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.TableEnvironment
-
-/**
-  * Simple example for demonstrating the use of the Table API for a Word Count in Scala.
-  *
-  * This example shows how to:
-  *  - Convert DataSets to Tables
-  *  - Apply group, aggregate, select, and filter operations
-  *
-  */
-object WordCountTable {
-
-  // *************************************************************************
-  //     PROGRAM
-  // *************************************************************************
-
-  def main(args: Array[String]): Unit = {
-
-    // set up execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
-    val expr = input.toTable(tEnv)
-    val result = expr
-      .groupBy('word)
-      .select('word, 'frequency.sum as 'frequency)
-      .filter('frequency === 2)
-      .toDataSet[WC]
-
-    result.print()
-  }
-
-  // *************************************************************************
-  //     USER DATA TYPES
-  // *************************************************************************
-
-  case class WC(word: String, frequency: Long)
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
new file mode 100644
index 0000000..59cad80
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import _root_.java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.sql2rel.RelDecorrelator
+import org.apache.calcite.tools.{Programs, RuleSet}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.io.DiscardingOutputFormat
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.table.calcite.FlinkPlannerImpl
+import org.apache.flink.table.explain.PlanJsonParser
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
+import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetRel}
+import org.apache.flink.table.plan.rules.FlinkRuleSets
+import org.apache.flink.table.plan.schema.{DataSetTable, TableSourceTable}
+import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
+import org.apache.flink.table.sources.BatchTableSource
+import org.apache.flink.types.Row
+
+/**
+  * The abstract base class for batch TableEnvironments.
+  *
+  * A TableEnvironment can be used to:
+  * - convert a [[DataSet]] to a [[Table]]
+  * - register a [[DataSet]] in the [[TableEnvironment]]'s catalog
+  * - register a [[Table]] in the [[TableEnvironment]]'s catalog
+  * - scan a registered table to obtain a [[Table]]
+  * - specify a SQL query on registered tables to obtain a [[Table]]
+  * - convert a [[Table]] into a [[DataSet]]
+  * - explain the AST and execution plan of a [[Table]]
+  *
+  * @param execEnv The [[ExecutionEnvironment]] which is wrapped in this [[BatchTableEnvironment]].
+  * @param config The [[TableConfig]] of this [[BatchTableEnvironment]].
+  */
+abstract class BatchTableEnvironment(
+    private[flink] val execEnv: ExecutionEnvironment,
+    config: TableConfig)
+  extends TableEnvironment(config) {
+
+  // a counter for unique table names.
+  private val nameCntr: AtomicInteger = new AtomicInteger(0)
+
+  // the naming pattern for internally registered tables.
+  private val internalNamePattern = "^_DataSetTable_[0-9]+$".r
+
+  /**
+    * Checks if the chosen table name is valid.
+    *
+    * @param name The table name to check.
+    */
+  override protected def checkValidTableName(name: String): Unit = {
+    val m = internalNamePattern.findFirstIn(name)
+    m match {
+      case Some(_) =>
+        throw new TableException(s"Illegal Table name. " +
+          s"Please choose a name that does not contain the pattern $internalNamePattern")
+      case None =>
+    }
+  }
+
+  /** Returns a unique table name according to the internal naming pattern. */
+  protected def createUniqueTableName(): String = "_DataSetTable_" + nameCntr.getAndIncrement()
+
+  /**
+    * Scans a registered table and returns the resulting [[Table]].
+    *
+    * The table to scan must be registered in the [[TableEnvironment]]'s catalog.
+    *
+    * @param tableName The name of the table to scan.
+    * @throws ValidationException if no table is registered under the given name.
+    * @return The scanned table.
+    */
+  @throws[ValidationException]
+  def scan(tableName: String): Table = {
+    if (isRegistered(tableName)) {
+      new Table(this, CatalogNode(tableName, getRowType(tableName)))
+    } else {
+      throw new TableException(s"Table \'$tableName\' was not found in the registry.")
+    }
+  }
+
+  /**
+    * Registers an external [[BatchTableSource]] in this [[TableEnvironment]]'s catalog.
+    * Registered tables can be referenced in SQL queries.
+    *
+    * @param name The name under which the [[BatchTableSource]] is registered.
+    * @param tableSource The [[BatchTableSource]] to register.
+    */
+  def registerTableSource(name: String, tableSource: BatchTableSource[_]): Unit = {
+
+    checkValidTableName(name)
+    registerTableInternal(name, new TableSourceTable(tableSource))
+  }
+
+  /**
+    * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
+    *
+    * All tables referenced by the query must be registered in the TableEnvironment.
+    *
+    * @param query The SQL query to evaluate.
+    * @return The result of the query as Table.
+    */
+  override def sql(query: String): Table = {
+
+    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
+    // parse the sql query
+    val parsed = planner.parse(query)
+    // validate the sql query
+    val validated = planner.validate(parsed)
+    // transform to a relational tree
+    val relational = planner.rel(validated)
+
+    new Table(this, LogicalRelNode(relational.rel))
+  }
+
+  /**
+    * Writes a [[Table]] to a [[TableSink]].
+    *
+    * Internally, the [[Table]] is translated into a [[DataSet]] and handed over to the
+    * [[TableSink]] to write it.
+    *
+    * @param table The [[Table]] to write.
+    * @param sink The [[TableSink]] to write the [[Table]] to.
+    * @tparam T The expected type of the [[DataSet]] which represents the [[Table]].
+    */
+  override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = {
+
+    sink match {
+      case batchSink: BatchTableSink[T] =>
+        val outputType = sink.getOutputType
+        // translate the Table into a DataSet and provide the type that the TableSink expects.
+        val result: DataSet[T] = translate(table)(outputType)
+        // Give the DataSet to the TableSink to emit it.
+        batchSink.emitDataSet(result)
+      case _ =>
+        throw new TableException("BatchTableSink required to emit batch Table")
+    }
+  }
+
+  /**
+    * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
+    * the result of the given [[Table]].
+    *
+    * @param table The table for which the AST and execution plan will be returned.
+    * @param extended Flag to include detailed optimizer estimates.
+    */
+  private[flink] def explain(table: Table, extended: Boolean): String = {
+    val ast = table.getRelNode
+    val optimizedPlan = optimize(ast)
+    val dataSet = translate[Row](optimizedPlan) (TypeExtractor.createTypeInfo(classOf[Row]))
+    dataSet.output(new DiscardingOutputFormat[Row])
+    val env = dataSet.getExecutionEnvironment
+    val jasonSqlPlan = env.getExecutionPlan
+    val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
+
+    s"== Abstract Syntax Tree ==" +
+        System.lineSeparator +
+        s"${RelOptUtil.toString(ast)}" +
+        System.lineSeparator +
+        s"== Optimized Logical Plan ==" +
+        System.lineSeparator +
+        s"${RelOptUtil.toString(optimizedPlan)}" +
+        System.lineSeparator +
+        s"== Physical Execution Plan ==" +
+        System.lineSeparator +
+        s"$sqlPlan"
+  }
+
+  /**
+    * Returns the AST of the specified Table API and SQL queries and the execution plan to compute
+    * the result of the given [[Table]].
+    *
+    * @param table The table for which the AST and execution plan will be returned.
+    */
+  def explain(table: Table): String = explain(table: Table, extended = false)
+
+  /**
+    * Registers a [[DataSet]] as a table under a given name in the [[TableEnvironment]]'s catalog.
+    *
+    * @param name The name under which the table is registered in the catalog.
+    * @param dataSet The [[DataSet]] to register as table in the catalog.
+    * @tparam T the type of the [[DataSet]].
+    */
+  protected def registerDataSetInternal[T](name: String, dataSet: DataSet[T]): Unit = {
+
+    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType)
+    val dataSetTable = new DataSetTable[T](
+      dataSet,
+      fieldIndexes,
+      fieldNames
+    )
+    registerTableInternal(name, dataSetTable)
+  }
+
+  /**
+    * Registers a [[DataSet]] as a table under a given name with field names as specified by
+    * field expressions in the [[TableEnvironment]]'s catalog.
+    *
+    * @param name The name under which the table is registered in the catalog.
+    * @param dataSet The [[DataSet]] to register as table in the catalog.
+    * @param fields The field expressions to define the field names of the table.
+    * @tparam T The type of the [[DataSet]].
+    */
+  protected def registerDataSetInternal[T](
+      name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = {
+
+    val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType, fields)
+    val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames)
+    registerTableInternal(name, dataSetTable)
+  }
+
+  /**
+    * Returns the built-in rules that are defined by the environment.
+    */
+  protected def getBuiltInRuleSet: RuleSet = FlinkRuleSets.DATASET_OPT_RULES
+
+  /**
+    * Generates the optimized [[RelNode]] tree from the original relational node tree.
+    *
+    * @param relNode The original [[RelNode]] tree
+    * @return The optimized [[RelNode]] tree
+    */
+  private[flink] def optimize(relNode: RelNode): RelNode = {
+
+    // decorrelate
+    val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
+
+    // optimize the logical Flink plan
+    val optProgram = Programs.ofRules(getRuleSet)
+    val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
+
+    val dataSetPlan = try {
+      optProgram.run(getPlanner, decorPlan, flinkOutputProps)
+    } catch {
+      case e: CannotPlanException =>
+        throw new TableException(
+          s"Cannot generate a valid execution plan for the given query: \n\n" +
+            s"${RelOptUtil.toString(relNode)}\n" +
+            s"This exception indicates that the query uses an unsupported SQL feature.\n" +
+            s"Please check the documentation for the set of currently supported SQL features.")
+      case t: TableException =>
+        throw new TableException(
+          s"Cannot generate a valid execution plan for the given query: \n\n" +
+            s"${RelOptUtil.toString(relNode)}\n" +
+            s"${t.msg}\n" +
+            s"Please check the documentation for the set of currently supported SQL features.")
+      case a: AssertionError =>
+        throw a.getCause
+    }
+    dataSetPlan
+  }
+
+  /**
+    * Translates a [[Table]] into a [[DataSet]].
+    *
+    * The transformation involves optimizing the relational expression tree as defined by
+    * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
+    *
+    * @param table The root node of the relational expression tree.
+    * @param tpe   The [[TypeInformation]] of the resulting [[DataSet]].
+    * @tparam A The type of the resulting [[DataSet]].
+    * @return The [[DataSet]] that corresponds to the translated [[Table]].
+    */
+  protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+    val dataSetPlan = optimize(table.getRelNode)
+    translate(dataSetPlan)
+  }
+
+  /**
+    * Translates a logical [[RelNode]] into a [[DataSet]].
+    *
+    * @param logicalPlan The root node of the relational expression tree.
+    * @param tpe         The [[TypeInformation]] of the resulting [[DataSet]].
+    * @tparam A The type of the resulting [[DataSet]].
+    * @return The [[DataSet]] that corresponds to the translated [[Table]].
+    */
+  protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+    validateType(tpe)
+
+    logicalPlan match {
+      case node: DataSetRel =>
+        node.translateToPlan(
+          this,
+          Some(tpe.asInstanceOf[TypeInformation[Any]])
+        ).asInstanceOf[DataSet[A]]
+      case _ => ???
+    }
+  }
+}