You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:59 UTC

[30/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
deleted file mode 100644
index c1c79ec..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.math.BigDecimal
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.types.Row
-
-abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
-
-  protected var minIndex: Int = _
-
-  /**
-   * Initiate the intermediate aggregate value in Row.
-   *
-   * @param intermediate The intermediate aggregate row to initiate.
-   */
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(minIndex, null)
-  }
-
-  /**
-   * Accessed in MapFunction, prepare the input of partial aggregate.
-   *
-   * @param value
-   * @param partial
-   */
-  override def prepare(value: Any, partial: Row): Unit = {
-    if (value == null) {
-      initiate(partial)
-    } else {
-      partial.setField(minIndex, value)
-    }
-  }
-
-  /**
-   * Accessed in CombineFunction and GroupReduceFunction, merge partial
-   * aggregate result into aggregate buffer.
-   *
-   * @param partial
-   * @param buffer
-   */
-  override def merge(partial: Row, buffer: Row): Unit = {
-    val partialValue = partial.getField(minIndex).asInstanceOf[T]
-    if (partialValue != null) {
-      val bufferValue = buffer.getField(minIndex).asInstanceOf[T]
-      if (bufferValue != null) {
-        val min : T = if (ord.compare(partialValue, bufferValue) < 0) partialValue else bufferValue
-        buffer.setField(minIndex, min)
-      } else {
-        buffer.setField(minIndex, partialValue)
-      }
-    }
-  }
-
-  /**
-   * Return the final aggregated result based on aggregate buffer.
-   *
-   * @param buffer
-   * @return
-   */
-  override def evaluate(buffer: Row): T = {
-    buffer.getField(minIndex).asInstanceOf[T]
-  }
-
-  override def supportPartial: Boolean = true
-
-  override def setAggOffsetInRow(aggOffset: Int): Unit = {
-    minIndex = aggOffset
-  }
-}
-
-class ByteMinAggregate extends MinAggregate[Byte] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
-
-}
-
-class ShortMinAggregate extends MinAggregate[Short] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO)
-
-}
-
-class IntMinAggregate extends MinAggregate[Int] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO)
-
-}
-
-class LongMinAggregate extends MinAggregate[Long] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
-
-}
-
-class FloatMinAggregate extends MinAggregate[Float] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO)
-
-}
-
-class DoubleMinAggregate extends MinAggregate[Double] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO)
-
-}
-
-class BooleanMinAggregate extends MinAggregate[Boolean] {
-
-  override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO)
-
-}
-
-class DecimalMinAggregate extends Aggregate[BigDecimal] {
-
-  protected var minIndex: Int = _
-
-  override def intermediateDataType = Array(BasicTypeInfo.BIG_DEC_TYPE_INFO)
-
-  override def initiate(intermediate: Row): Unit = {
-    intermediate.setField(minIndex, null)
-  }
-
-  override def prepare(value: Any, partial: Row): Unit = {
-    if (value == null) {
-      initiate(partial)
-    } else {
-      partial.setField(minIndex, value)
-    }
-  }
-
-  override def merge(partial: Row, buffer: Row): Unit = {
-    val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal]
-    if (partialValue != null) {
-      val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal]
-      if (bufferValue != null) {
-        val min = if (partialValue.compareTo(bufferValue) < 0) partialValue else bufferValue
-        buffer.setField(minIndex, min)
-      } else {
-        buffer.setField(minIndex, partialValue)
-      }
-    }
-  }
-
-  override def evaluate(buffer: Row): BigDecimal = {
-    buffer.getField(minIndex).asInstanceOf[BigDecimal]
-  }
-
-  override def supportPartial: Boolean = true
-
-  override def setAggOffsetInRow(aggOffset: Int): Unit = {
-    minIndex = aggOffset
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
deleted file mode 100644
index 16f1608..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.math.BigDecimal
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.types.Row
-
-abstract class SumAggregate[T: Numeric]
-  extends Aggregate[T] {
-
-  private val numeric = implicitly[Numeric[T]]
-  protected var sumIndex: Int = _
-
-  override def initiate(partial: Row): Unit = {
-    partial.setField(sumIndex, null)
-  }
-
-  override def merge(partial1: Row, buffer: Row): Unit = {
-    val partialValue = partial1.getField(sumIndex).asInstanceOf[T]
-    if (partialValue != null) {
-      val bufferValue = buffer.getField(sumIndex).asInstanceOf[T]
-      if (bufferValue != null) {
-        buffer.setField(sumIndex, numeric.plus(partialValue, bufferValue))
-      } else {
-        buffer.setField(sumIndex, partialValue)
-      }
-    }
-  }
-
-  override def evaluate(buffer: Row): T = {
-    buffer.getField(sumIndex).asInstanceOf[T]
-  }
-
-  override def prepare(value: Any, partial: Row): Unit = {
-    if (value == null) {
-      initiate(partial)
-    } else {
-      val input = value.asInstanceOf[T]
-      partial.setField(sumIndex, input)
-    }
-  }
-
-  override def supportPartial: Boolean = true
-
-  override def setAggOffsetInRow(aggOffset: Int): Unit = {
-    sumIndex = aggOffset
-  }
-}
-
-class ByteSumAggregate extends SumAggregate[Byte] {
-  override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
-}
-
-class ShortSumAggregate extends SumAggregate[Short] {
-  override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO)
-}
-
-class IntSumAggregate extends SumAggregate[Int] {
-  override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO)
-}
-
-class LongSumAggregate extends SumAggregate[Long] {
-  override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
-}
-
-class FloatSumAggregate extends SumAggregate[Float] {
-  override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO)
-}
-
-class DoubleSumAggregate extends SumAggregate[Double] {
-  override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO)
-}
-
-class DecimalSumAggregate extends Aggregate[BigDecimal] {
-
-  protected var sumIndex: Int = _
-
-  override def intermediateDataType = Array(BasicTypeInfo.BIG_DEC_TYPE_INFO)
-
-  override def initiate(partial: Row): Unit = {
-    partial.setField(sumIndex, null)
-  }
-
-  override def merge(partial1: Row, buffer: Row): Unit = {
-    val partialValue = partial1.getField(sumIndex).asInstanceOf[BigDecimal]
-    if (partialValue != null) {
-      val bufferValue = buffer.getField(sumIndex).asInstanceOf[BigDecimal]
-      if (bufferValue != null) {
-        buffer.setField(sumIndex, partialValue.add(bufferValue))
-      } else {
-        buffer.setField(sumIndex, partialValue)
-      }
-    }
-  }
-
-  override def evaluate(buffer: Row): BigDecimal = {
-    buffer.getField(sumIndex).asInstanceOf[BigDecimal]
-  }
-
-  override def prepare(value: Any, partial: Row): Unit = {
-    if (value == null) {
-      initiate(partial)
-    } else {
-      val input = value.asInstanceOf[BigDecimal]
-      partial.setField(sumIndex, input)
-    }
-  }
-
-  override def supportPartial: Boolean = true
-
-  override def setAggOffsetInRow(aggOffset: Int): Unit = {
-    sumIndex = aggOffset
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala
deleted file mode 100644
index 417c1f1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import org.apache.calcite.runtime.SqlFunctions
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.util.Collector
-
-/**
-  * Adds TimeWindow properties to specified fields of a row before it emits the row to a wrapped
-  * collector.
-  */
-class TimeWindowPropertyCollector(windowStartOffset: Option[Int], windowEndOffset: Option[Int])
-    extends Collector[Row] {
-
-  var wrappedCollector: Collector[Row] = _
-  var timeWindow: TimeWindow = _
-
-  override def collect(record: Row): Unit = {
-
-    val lastFieldPos = record.getArity - 1
-
-    if (windowStartOffset.isDefined) {
-      record.setField(
-        lastFieldPos + windowStartOffset.get,
-        SqlFunctions.internalToTimestamp(timeWindow.getStart))
-    }
-    if (windowEndOffset.isDefined) {
-      record.setField(
-        lastFieldPos + windowEndOffset.get,
-        SqlFunctions.internalToTimestamp(timeWindow.getEnd))
-    }
-    wrappedCollector.collect(record)
-  }
-
-  override def close(): Unit = wrappedCollector.close()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
deleted file mode 100644
index 2a4be46..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.io
-
-import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.table.codegen.Compiler
-import org.apache.flink.core.io.GenericInputSplit
-import org.slf4j.LoggerFactory
-
-class ValuesInputFormat[OUT](
-    name: String,
-    code: String,
-    @transient returnType: TypeInformation[OUT])
-  extends GenericInputFormat[OUT]
-  with NonParallelInput
-  with ResultTypeQueryable[OUT]
-  with Compiler[GenericInputFormat[OUT]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  private var format: GenericInputFormat[OUT] = _
-
-  override def open(split: GenericInputSplit): Unit = {
-    LOG.debug(s"Compiling GenericInputFormat: $name \n\n Code:\n$code")
-    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
-    LOG.debug("Instantiating GenericInputFormat.")
-    format = clazz.newInstance()
-  }
-
-  override def reachedEnd(): Boolean = format.reachedEnd()
-
-  override def nextRecord(reuse: OUT): OUT = format.nextRecord(reuse)
-
-  override def getProducedType: TypeInformation[OUT] = returnType
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/BatchTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/BatchTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/BatchTableSink.scala
deleted file mode 100644
index 27dbe8e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/BatchTableSink.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.sinks
-
-import org.apache.flink.api.java.DataSet
-
-/** Defines an external [[TableSink]] to emit a batch [[org.apache.flink.api.table.Table]].
-  *
-  * @tparam T Type of [[DataSet]] that this [[TableSink]] expects and supports.
-  */
-trait BatchTableSink[T] extends TableSink[T] {
-
-  /** Emits the DataSet. */
-  def emitDataSet(dataSet: DataSet[T]): Unit
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
deleted file mode 100644
index 5038d9b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.sinks
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.types.Row
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.streaming.api.datastream.DataStream
-
-/**
-  * A simple [[TableSink]] to emit data as CSV files.
-  *
-  * @param path The output path to write the Table to.
-  * @param fieldDelim The field delimiter, ',' by default.
-  */
-class CsvTableSink(
-    path: String,
-    fieldDelim: String = ",")
-  extends TableSinkBase[Row] with BatchTableSink[Row] with StreamTableSink[Row] {
-
-  override def emitDataSet(dataSet: DataSet[Row]): Unit = {
-    dataSet
-      .map(new CsvFormatter(fieldDelim))
-      .writeAsText(path)
-  }
-
-  override def emitDataStream(dataStream: DataStream[Row]): Unit = {
-    dataStream
-      .map(new CsvFormatter(fieldDelim))
-      .writeAsText(path)
-  }
-
-  override protected def copy: TableSinkBase[Row] = {
-    new CsvTableSink(path, fieldDelim)
-  }
-
-  override def getOutputType: TypeInformation[Row] = {
-    new RowTypeInfo(getFieldTypes: _*)
-  }
-}
-
-/**
-  * Formats a [[Row]] into a [[String]] with fields separated by the field delimiter.
-  *
-  * @param fieldDelim The field delimiter.
-  */
-class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
-  override def map(row: Row): String = {
-
-    val builder = new StringBuilder
-
-    // write first value
-    val v = row.getField(0)
-    if (v != null) {
-      builder.append(v.toString)
-    }
-
-    // write following values
-    for (i <- 1 until row.getArity) {
-      builder.append(fieldDelim)
-      val v = row.getField(i)
-      if (v != null) {
-        builder.append(v.toString)
-      }
-    }
-    builder.mkString
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/StreamTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/StreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/StreamTableSink.scala
deleted file mode 100644
index 61ef3b2..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/StreamTableSink.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.sinks
-
-import org.apache.flink.streaming.api.datastream.DataStream
-
-/** Defines an external [[TableSink]] to emit a batch [[org.apache.flink.api.table.Table]].
-  *
-  * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and supports.
-  */
-trait StreamTableSink[T] extends TableSink[T] {
-
-  /** Emits the DataStream. */
-  def emitDataStream(dataStream: DataStream[T]): Unit
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala
deleted file mode 100644
index 3dfc6f1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.sinks
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-/** A [[TableSink]] specifies how to emit a [[org.apache.flink.api.table.Table]] to an external
-  * system or location.
-  *
-  * The interface is generic such that it can support different storage locations and formats.
-  *
-  * @tparam T The return type of the [[TableSink]].
-  */
-trait TableSink[T] {
-
-  /**
-    * Return the type expected by this [[TableSink]].
-    *
-    * This type should depend on the types returned by [[getFieldNames]].
-    *
-    * @return The type expected by this [[TableSink]].
-    */
-  def getOutputType: TypeInformation[T]
-
-  /** Returns the names of the table fields. */
-  def getFieldNames: Array[String]
-
-  /** Returns the types of the table fields. */
-  def getFieldTypes: Array[TypeInformation[_]]
-
-  /**
-    * Return a copy of this [[TableSink]] configured with the field names and types of the
-    * [[org.apache.flink.api.table.Table]] to emit.
-    *
-    * @param fieldNames The field names of the table to emit.
-    * @param fieldTypes The field types of the table to emit.
-    * @return A copy of this [[TableSink]] configured with the field names and types of the
-    *         [[org.apache.flink.api.table.Table]] to emit.
-    */
-  def configure(fieldNames: Array[String],
-                fieldTypes: Array[TypeInformation[_]]): TableSink[T]
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala
deleted file mode 100644
index 612ee0a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.sinks
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-trait TableSinkBase[T] extends TableSink[T] {
-
-  private var fieldNames: Option[Array[String]] = None
-  private var fieldTypes: Option[Array[TypeInformation[_]]] = None
-
-  /** Return a deep copy of the [[TableSink]]. */
-  protected def copy: TableSinkBase[T]
-
-  /**
-    * Return the field names of the [[org.apache.flink.api.table.Table]] to emit. */
-  def getFieldNames: Array[String] = {
-    fieldNames match {
-      case Some(n) => n
-      case None => throw new IllegalStateException(
-        "TableSink must be configured to retrieve field names.")
-    }
-  }
-
-  /** Return the field types of the [[org.apache.flink.api.table.Table]] to emit. */
-  def getFieldTypes: Array[TypeInformation[_]] = {
-    fieldTypes match {
-      case Some(t) => t
-      case None => throw new IllegalStateException(
-        "TableSink must be configured to retrieve field types.")
-    }
-  }
-
-  /**
-    * Return a copy of this [[TableSink]] configured with the field names and types of the
-    * [[org.apache.flink.api.table.Table]] to emit.
-    *
-    * @param fieldNames The field names of the table to emit.
-    * @param fieldTypes The field types of the table to emit.
-    * @return A copy of this [[TableSink]] configured with the field names and types of the
-    *         [[org.apache.flink.api.table.Table]] to emit.
-    */
-  final def configure(fieldNames: Array[String],
-                      fieldTypes: Array[TypeInformation[_]]): TableSink[T] = {
-
-    val configuredSink = this.copy
-    configuredSink.fieldNames = Some(fieldNames)
-    configuredSink.fieldTypes = Some(fieldTypes)
-
-    configuredSink
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala
deleted file mode 100644
index 74e4cd6..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.sources
-
-import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
-
-/** Defines an external batch table and provides access to its data.
-  *
-  * @tparam T Type of the [[DataSet]] created by this [[TableSource]].
-  */
-trait BatchTableSource[T] extends TableSource[T] {
-
-  /**
-    * Returns the data of the table as a [[DataSet]].
-    *
-    * NOTE: This method is for internal use only for defining a [[TableSource]].
-    *       Do not use it in Table API programs.
-    */
-  def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
deleted file mode 100644
index b60575a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.sources
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.io.CsvInputFormat
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.api.table.TableException
-import org.apache.flink.types.Row
-import org.apache.flink.api.java.io.RowCsvInputFormat
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.core.fs.Path
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-
-/**
-  * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
-  * (logically) unlimited number of fields.
-  *
-  * @param path The path to the CSV file.
-  * @param fieldNames The names of the table fields.
-  * @param fieldTypes The types of the table fields.
-  * @param fieldDelim The field delimiter, "," by default.
-  * @param rowDelim The row delimiter, "\n" by default.
-  * @param quoteCharacter An optional quote character for String values, null by default.
-  * @param ignoreFirstLine Flag to ignore the first line, false by default.
-  * @param ignoreComments An optional prefix to indicate comments, null by default.
-  * @param lenient Flag to skip records with parse error instead to fail, false by default.
-  */
-class CsvTableSource(
-    path: String,
-    fieldNames: Array[String],
-    fieldTypes: Array[TypeInformation[_]],
-    fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
-    rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
-    quoteCharacter: Character = null,
-    ignoreFirstLine: Boolean = false,
-    ignoreComments: String = null,
-    lenient: Boolean = false)
-  extends BatchTableSource[Row]
-  with StreamTableSource[Row] {
-
-  /**
-    * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
-    * (logically) unlimited number of fields.
-    *
-    * @param path The path to the CSV file.
-    * @param fieldNames The names of the table fields.
-    * @param fieldTypes The types of the table fields.
-    */
-  def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) =
-    this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
-      CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
-
-  if (fieldNames.length != fieldTypes.length) {
-    throw TableException("Number of field names and field types must be equal.")
-  }
-
-  private val returnType = new RowTypeInfo(fieldTypes: _*)
-
-  /**
-    * Returns the data of the table as a [[DataSet]] of [[Row]].
-    *
-    * NOTE: This method is for internal use only for defining a [[TableSource]].
-    *       Do not use it in Table API programs.
-    */
-  override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
-    execEnv.createInput(createCsvInput(), returnType)
-  }
-
-  /** Returns the types of the table fields. */
-  override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
-
-  /** Returns the names of the table fields. */
-  override def getFieldsNames: Array[String] = fieldNames
-
-  /** Returns the number of fields of the table. */
-  override def getNumberOfFields: Int = fieldNames.length
-
-  /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */
-  override def getReturnType: RowTypeInfo = returnType
-
-  /**
-    * Returns the data of the table as a [[DataStream]] of [[Row]].
-    *
-    * NOTE: This method is for internal use only for defining a [[TableSource]].
-    *       Do not use it in Table API programs.
-    */
-  override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = {
-    streamExecEnv.createInput(createCsvInput(), returnType)
-  }
-
-  private def createCsvInput(): RowCsvInputFormat = {
-    val inputFormat = new RowCsvInputFormat(new Path(path), returnType, rowDelim, fieldDelim)
-
-    inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
-    inputFormat.setLenient(lenient)
-    if (quoteCharacter != null) {
-      inputFormat.enableQuotedStringParsing(quoteCharacter)
-    }
-    if (ignoreComments != null) {
-      inputFormat.setCommentPrefix(ignoreComments)
-    }
-
-    inputFormat
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/ProjectableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/ProjectableTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/ProjectableTableSource.scala
deleted file mode 100644
index c04138a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/ProjectableTableSource.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.sources
-
-/**
-  * Adds support for projection push-down to a [[TableSource]].
-  * A [[TableSource]] extending this interface is able to project the fields of the return table.
-  *
-  * @tparam T The return type of the [[ProjectableTableSource]].
-  */
-trait ProjectableTableSource[T] {
-
-  /**
-    * Creates a copy of the [[ProjectableTableSource]] that projects its output on the specified
-    * fields.
-    *
-    * @param fields The indexes of the fields to return.
-    * @return A copy of the [[ProjectableTableSource]] that projects its output.
-    */
-  def projectFields(fields: Array[Int]): ProjectableTableSource[T]
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala
deleted file mode 100644
index cdae0b3..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.sources
-
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-
-/** Defines an external stream table and provides access to its data.
-  *
-  * @tparam T Type of the [[DataStream]] created by this [[TableSource]].
-  */
-trait StreamTableSource[T] extends TableSource[T] {
-
-  /**
-    * Returns the data of the table as a [[DataStream]].
-    *
-    * NOTE: This method is for internal use only for defining a [[TableSource]].
-    *       Do not use it in Table API programs.
-    */
-  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/TableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/TableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/TableSource.scala
deleted file mode 100644
index e1ada62..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/TableSource.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.sources
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-/** Defines an external table by providing schema information, i.e., field names and types.
-  *
-  * @tparam T The return type of the [[TableSource]].
-  */
-trait TableSource[T] {
-
-  /** Returns the number of fields of the table. */
-  def getNumberOfFields: Int
-
-  /** Returns the names of the table fields. */
-  def getFieldsNames: Array[String]
-
-  /** Returns the types of the table fields. */
-  def getFieldTypes: Array[TypeInformation[_]]
-
-  /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */
-  def getReturnType: TypeInformation[T]
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
deleted file mode 100644
index 94c8e8c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ /dev/null
@@ -1,922 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-import org.apache.calcite.rel.RelNode
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.plan.ProjectionTranslator._
-import org.apache.flink.api.table.plan.logical.{Minus, _}
-import org.apache.flink.api.table.sinks.TableSink
-
-import scala.collection.JavaConverters._
-
-/**
-  * A Table is the core component of the Table API.
-  * Similar to how the batch and streaming APIs have DataSet and DataStream,
-  * the Table API is built around [[Table]].
-  *
-  * Use the methods of [[Table]] to transform data. Use [[TableEnvironment]] to convert a [[Table]]
-  * back to a DataSet or DataStream.
-  *
-  * When using Scala a [[Table]] can also be converted using implicit conversions.
-  *
-  * Example:
-  *
-  * {{{
-  *   val env = ExecutionEnvironment.getExecutionEnvironment
-  *   val tEnv = TableEnvironment.getTableEnvironment(env)
-  *
-  *   val set: DataSet[(String, Int)] = ...
-  *   val table = set.toTable(tEnv, 'a, 'b)
-  *   ...
-  *   val table2 = ...
-  *   val set2: DataSet[MyType] = table2.toDataSet[MyType]
-  * }}}
-  *
-  * Operations such as [[join]], [[select]], [[where]] and [[groupBy]] either take arguments
-  * in a Scala DSL or as an expression String. Please refer to the documentation for the expression
-  * syntax.
-  *
-  * @param tableEnv The [[TableEnvironment]] to which the table is bound.
-  * @param logicalPlan logical representation
-  */
-class Table(
-    private[flink] val tableEnv: TableEnvironment,
-    private[flink] val logicalPlan: LogicalNode) {
-
-  def relBuilder = tableEnv.getRelBuilder
-
-  def getRelNode: RelNode = logicalPlan.toRelNode(relBuilder)
-
-  /**
-    * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
-    * can contain complex expressions and aggregations.
-    *
-    * Example:
-    *
-    * {{{
-    *   tab.select('key, 'value.avg + " The average" as 'average)
-    * }}}
-    */
-  def select(fields: Expression*): Table = {
-    val expandedFields = expandProjectList(fields, logicalPlan, tableEnv)
-    val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, tableEnv)
-    if (propNames.nonEmpty) {
-      throw ValidationException("Window properties can only be used on windowed tables.")
-    }
-
-    if (aggNames.nonEmpty) {
-      val projectsOnAgg = replaceAggregationsAndProperties(
-        expandedFields, tableEnv, aggNames, propNames)
-      val projectFields = extractFieldReferences(expandedFields)
-
-      new Table(tableEnv,
-        Project(projectsOnAgg,
-          Aggregate(Nil, aggNames.map(a => Alias(a._1, a._2)).toSeq,
-            Project(projectFields, logicalPlan).validate(tableEnv)
-          ).validate(tableEnv)
-        ).validate(tableEnv)
-      )
-    } else {
-      new Table(tableEnv,
-        Project(expandedFields.map(UnresolvedAlias), logicalPlan).validate(tableEnv))
-    }
-  }
-
-  /**
-    * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
-    * can contain complex expressions and aggregations.
-    *
-    * Example:
-    *
-    * {{{
-    *   tab.select("key, value.avg + ' The average' as average")
-    * }}}
-    */
-  def select(fields: String): Table = {
-    val fieldExprs = ExpressionParser.parseExpressionList(fields)
-    select(fieldExprs: _*)
-  }
-
-  /**
-    * Renames the fields of the expression result. Use this to disambiguate fields before
-    * joining to operations.
-    *
-    * Example:
-    *
-    * {{{
-    *   tab.as('a, 'b)
-    * }}}
-    */
-  def as(fields: Expression*): Table = {
-    new Table(tableEnv, AliasNode(fields, logicalPlan).validate(tableEnv))
-  }
-
-  /**
-    * Renames the fields of the expression result. Use this to disambiguate fields before
-    * joining to operations.
-    *
-    * Example:
-    *
-    * {{{
-    *   tab.as("a, b")
-    * }}}
-    */
-  def as(fields: String): Table = {
-    val fieldExprs = ExpressionParser.parseExpressionList(fields)
-    as(fieldExprs: _*)
-  }
-
-  /**
-    * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
-    * clause.
-    *
-    * Example:
-    *
-    * {{{
-    *   tab.filter('name === "Fred")
-    * }}}
-    */
-  def filter(predicate: Expression): Table = {
-    new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv))
-  }
-
-  /**
-    * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
-    * clause.
-    *
-    * Example:
-    *
-    * {{{
-    *   tab.filter("name = 'Fred'")
-    * }}}
-    */
-  def filter(predicate: String): Table = {
-    val predicateExpr = ExpressionParser.parseExpression(predicate)
-    filter(predicateExpr)
-  }
-
-  /**
-    * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
-    * clause.
-    *
-    * Example:
-    *
-    * {{{
-    *   tab.where('name === "Fred")
-    * }}}
-    */
-  def where(predicate: Expression): Table = {
-    filter(predicate)
-  }
-
-  /**
-    * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
-    * clause.
-    *
-    * Example:
-    *
-    * {{{
-    *   tab.where("name = 'Fred'")
-    * }}}
-    */
-  def where(predicate: String): Table = {
-    filter(predicate)
-  }
-
-  /**
-    * Groups the elements on some grouping keys. Use this before a selection with aggregations
-    * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
-    *
-    * Example:
-    *
-    * {{{
-    *   tab.groupBy('key).select('key, 'value.avg)
-    * }}}
-    */
-  def groupBy(fields: Expression*): GroupedTable = {
-    new GroupedTable(this, fields)
-  }
-
-  /**
-    * Groups the elements on some grouping keys. Use this before a selection with aggregations
-    * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
-    *
-    * Example:
-    *
-    * {{{
-    *   tab.groupBy("key").select("key, value.avg")
-    * }}}
-    */
-  def groupBy(fields: String): GroupedTable = {
-    val fieldsExpr = ExpressionParser.parseExpressionList(fields)
-    groupBy(fieldsExpr: _*)
-  }
-
-  /**
-    * Removes duplicate values and returns only distinct (different) values.
-    *
-    * Example:
-    *
-    * {{{
-    *   tab.select("key, value").distinct()
-    * }}}
-    */
-  def distinct(): Table = {
-    new Table(tableEnv, Distinct(logicalPlan).validate(tableEnv))
-  }
-
-  /**
-    * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
-    * operations must not overlap, use [[as]] to rename fields if necessary. You can use
-    * where and select clauses after a join to further specify the behaviour of the join.
-    *
-    * Note: Both tables must be bound to the same [[TableEnvironment]].
-    *
-    * Example:
-    *
-    * {{{
-    *   left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd)
-    * }}}
-    */
-  def join(right: Table): Table = {
-    join(right, None, JoinType.INNER)
-  }
-
-  /**
-    * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
-    * operations must not overlap, use [[as]] to rename fields if necessary.
-    *
-    * Note: Both tables must be bound to the same [[TableEnvironment]].
-    *
-    * Example:
-    *
-    * {{{
-    *   left.join(right, "a = b")
-    * }}}
-    */
-  def join(right: Table, joinPredicate: String): Table = {
-    join(right, joinPredicate, JoinType.INNER)
-  }
-
-  /**
-    * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
-    * operations must not overlap, use [[as]] to rename fields if necessary.
-    *
-    * Note: Both tables must be bound to the same [[TableEnvironment]].
-    *
-    * Example:
-    *
-    * {{{
-    *   left.join(right, 'a === 'b).select('a, 'b, 'd)
-    * }}}
-    */
-  def join(right: Table, joinPredicate: Expression): Table = {
-    join(right, Some(joinPredicate), JoinType.INNER)
-  }
-
-  /**
-    * Joins two [[Table]]s. Similar to an SQL left outer join. The fields of the two joined
-    * operations must not overlap, use [[as]] to rename fields if necessary.
-    *
-    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
-    * have nullCheck enabled.
-    *
-    * Example:
-    *
-    * {{{
-    *   left.leftOuterJoin(right, "a = b").select('a, 'b, 'd)
-    * }}}
-    */
-  def leftOuterJoin(right: Table, joinPredicate: String): Table = {
-    join(right, joinPredicate, JoinType.LEFT_OUTER)
-  }
-
-  /**
-    * Joins two [[Table]]s. Similar to an SQL left outer join. The fields of the two joined
-    * operations must not overlap, use [[as]] to rename fields if necessary.
-    *
-    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
-    * have nullCheck enabled.
-    *
-    * Example:
-    *
-    * {{{
-    *   left.leftOuterJoin(right, 'a === 'b).select('a, 'b, 'd)
-    * }}}
-    */
-  def leftOuterJoin(right: Table, joinPredicate: Expression): Table = {
-    join(right, Some(joinPredicate), JoinType.LEFT_OUTER)
-  }
-
-  /**
-    * Joins two [[Table]]s. Similar to an SQL right outer join. The fields of the two joined
-    * operations must not overlap, use [[as]] to rename fields if necessary.
-    *
-    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
-    * have nullCheck enabled.
-    *
-    * Example:
-    *
-    * {{{
-    *   left.rightOuterJoin(right, "a = b").select('a, 'b, 'd)
-    * }}}
-    */
-  def rightOuterJoin(right: Table, joinPredicate: String): Table = {
-    join(right, joinPredicate, JoinType.RIGHT_OUTER)
-  }
-
-  /**
-    * Joins two [[Table]]s. Similar to an SQL right outer join. The fields of the two joined
-    * operations must not overlap, use [[as]] to rename fields if necessary.
-    *
-    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
-    * have nullCheck enabled.
-    *
-    * Example:
-    *
-    * {{{
-    *   left.rightOuterJoin(right, 'a === 'b).select('a, 'b, 'd)
-    * }}}
-    */
-  def rightOuterJoin(right: Table, joinPredicate: Expression): Table = {
-    join(right, Some(joinPredicate), JoinType.RIGHT_OUTER)
-  }
-
-  /**
-    * Joins two [[Table]]s. Similar to an SQL full outer join. The fields of the two joined
-    * operations must not overlap, use [[as]] to rename fields if necessary.
-    *
-    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
-    * have nullCheck enabled.
-    *
-    * Example:
-    *
-    * {{{
-    *   left.fullOuterJoin(right, "a = b").select('a, 'b, 'd)
-    * }}}
-    */
-  def fullOuterJoin(right: Table, joinPredicate: String): Table = {
-    join(right, joinPredicate, JoinType.FULL_OUTER)
-  }
-
-  /**
-    * Joins two [[Table]]s. Similar to an SQL full outer join. The fields of the two joined
-    * operations must not overlap, use [[as]] to rename fields if necessary.
-    *
-    * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
-    * have nullCheck enabled.
-    *
-    * Example:
-    *
-    * {{{
-    *   left.fullOuterJoin(right, 'a === 'b).select('a, 'b, 'd)
-    * }}}
-    */
-  def fullOuterJoin(right: Table, joinPredicate: Expression): Table = {
-    join(right, Some(joinPredicate), JoinType.FULL_OUTER)
-  }
-
-  private def join(right: Table, joinPredicate: String, joinType: JoinType): Table = {
-    val joinPredicateExpr = ExpressionParser.parseExpression(joinPredicate)
-    join(right, Some(joinPredicateExpr), joinType)
-  }
-
-  private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
-    // check that right table belongs to the same TableEnvironment
-    if (right.tableEnv != this.tableEnv) {
-      throw new ValidationException("Only tables from the same TableEnvironment can be joined.")
-    }
-    new Table(
-      tableEnv,
-      Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, correlated = false)
-        .validate(tableEnv))
-  }
-
-  /**
-    * Minus of two [[Table]]s with duplicate records removed.
-    * Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not
-    * exist in the right table. Duplicate records in the left table are returned
-    * exactly once, i.e., duplicates are removed. Both tables must have identical field types.
-    *
-    * Note: Both tables must be bound to the same [[TableEnvironment]].
-    *
-    * Example:
-    *
-    * {{{
-    *   left.minus(right)
-    * }}}
-    */
-  def minus(right: Table): Table = {
-    // check that right table belongs to the same TableEnvironment
-    if (right.tableEnv != this.tableEnv) {
-      throw new ValidationException("Only tables from the same TableEnvironment can be " +
-        "subtracted.")
-    }
-    new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = false)
-      .validate(tableEnv))
-  }
-
-  /**
-    * Minus of two [[Table]]s. Similar to an SQL EXCEPT ALL.
-    * Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in
-    * the right table. A record that is present n times in the left table and m times
-    * in the right table is returned (n - m) times, i.e., as many duplicates as are present
-    * in the right table are removed. Both tables must have identical field types.
-    *
-    * Note: Both tables must be bound to the same [[TableEnvironment]].
-    *
-    * Example:
-    *
-    * {{{
-    *   left.minusAll(right)
-    * }}}
-    */
-  def minusAll(right: Table): Table = {
-    // check that right table belongs to the same TableEnvironment
-    if (right.tableEnv != this.tableEnv) {
-      throw new ValidationException("Only tables from the same TableEnvironment can be " +
-        "subtracted.")
-    }
-    new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = true)
-      .validate(tableEnv))
-  }
-
-  /**
-    * Unions two [[Table]]s with duplicate records removed.
-    * Similar to an SQL UNION. The fields of the two union operations must fully overlap.
-    *
-    * Note: Both tables must be bound to the same [[TableEnvironment]].
-    *
-    * Example:
-    *
-    * {{{
-    *   left.union(right)
-    * }}}
-    */
-  def union(right: Table): Table = {
-    // check that right table belongs to the same TableEnvironment
-    if (right.tableEnv != this.tableEnv) {
-      throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
-    }
-    new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))
-  }
-
-  /**
-    * Unions two [[Table]]s. Similar to an SQL UNION ALL. The fields of the two union operations
-    * must fully overlap.
-    *
-    * Note: Both tables must be bound to the same [[TableEnvironment]].
-    *
-    * Example:
-    *
-    * {{{
-    *   left.unionAll(right)
-    * }}}
-    */
-  def unionAll(right: Table): Table = {
-    // check that right table belongs to the same TableEnvironment
-    if (right.tableEnv != this.tableEnv) {
-      throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
-    }
-    new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))
-  }
-
-  /**
-    * Intersects two [[Table]]s with duplicate records removed. Intersect returns records that
-    * exist in both tables. If a record is present in one or both tables more than once, it is
-    * returned just once, i.e., the resulting table has no duplicate records. Similar to an
-    * SQL INTERSECT. The fields of the two intersect operations must fully overlap.
-    *
-    * Note: Both tables must be bound to the same [[TableEnvironment]].
-    *
-    * Example:
-    *
-    * {{{
-    *   left.intersect(right)
-    * }}}
-    */
-  def intersect(right: Table): Table = {
-    // check that right table belongs to the same TableEnvironment
-    if (right.tableEnv != this.tableEnv) {
-      throw new ValidationException(
-        "Only tables from the same TableEnvironment can be intersected.")
-    }
-    new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))
-  }
-
-  /**
-    * Intersects two [[Table]]s. IntersectAll returns records that exist in both tables.
-    * If a record is present in both tables more than once, it is returned as many times as it
-    * is present in both tables, i.e., the resulting table might have duplicate records. Similar
-    * to an SQL INTERSECT ALL. The fields of the two intersect operations must fully overlap.
-    *
-    * Note: Both tables must be bound to the same [[TableEnvironment]].
-    *
-    * Example:
-    *
-    * {{{
-    *   left.intersectAll(right)
-    * }}}
-    */
-  def intersectAll(right: Table): Table = {
-    // check that right table belongs to the same TableEnvironment
-    if (right.tableEnv != this.tableEnv) {
-      throw new ValidationException(
-        "Only tables from the same TableEnvironment can be intersected.")
-    }
-    new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))
-  }
-
-  /**
-    * Sorts the given [[Table]]. Similar to SQL ORDER BY.
-    * The resulting Table is globally sorted across all parallel partitions.
-    *
-    * Example:
-    *
-    * {{{
-    *   tab.orderBy('name.desc)
-    * }}}
-    */
-  def orderBy(fields: Expression*): Table = {
-    val order: Seq[Ordering] = fields.map {
-      case o: Ordering => o
-      case e => Asc(e)
-    }
-    new Table(tableEnv, Sort(order, logicalPlan).validate(tableEnv))
-  }
-
-  /**
-    * Sorts the given [[Table]]. Similar to SQL ORDER BY.
-    * The resulting Table is sorted globally sorted across all parallel partitions.
-    *
-    * Example:
-    *
-    * {{{
-    *   tab.orderBy("name.desc")
-    * }}}
-    */
-  def orderBy(fields: String): Table = {
-    val parsedFields = ExpressionParser.parseExpressionList(fields)
-    orderBy(parsedFields: _*)
-  }
-
-  /**
-    * Limits a sorted result from an offset position.
-    * Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and
-    * thus must be preceded by it.
-    *
-    * Example:
-    *
-    * {{{
-    *   // returns unlimited number of records beginning with the 4th record
-    *   tab.orderBy('name.desc).limit(3)
-    * }}}
-    *
-    * @param offset number of records to skip
-    */
-  def limit(offset: Int): Table = {
-    new Table(tableEnv, Limit(offset = offset, child = logicalPlan).validate(tableEnv))
-  }
-
-  /**
-    * Limits a sorted result to a specified number of records from an offset position.
-    * Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and
-    * thus must be preceded by it.
-    *
-    * Example:
-    *
-    * {{{
-    *   // returns 5 records beginning with the 4th record
-    *   tab.orderBy('name.desc).limit(3, 5)
-    * }}}
-    *
-    * @param offset number of records to skip
-    * @param fetch number of records to be returned
-    */
-  def limit(offset: Int, fetch: Int): Table = {
-    new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
-  }
-
-  /**
-    * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
-    * to an SQL cross join, but it works with a table function. It returns rows from the outer
-    * table (table on the left of the operator) that produces matching values from the table
-    * function (which is defined in the expression on the right side of the operator).
-    *
-    * Example:
-    *
-    * {{{
-    *   class MySplitUDTF extends TableFunction[String] {
-    *     def eval(str: String): Unit = {
-    *       str.split("#").foreach(collect)
-    *     }
-    *   }
-    *
-    *   val split = new MySplitUDTF()
-    *   table.join(split('c) as ('s)).select('a,'b,'c,'s)
-    * }}}
-    */
-  def join(udtf: Expression): Table = {
-    joinUdtfInternal(udtf, JoinType.INNER)
-  }
-
-  /**
-    * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
-    * to an SQL cross join, but it works with a table function. It returns rows from the outer
-    * table (table on the left of the operator) that produces matching values from the table
-    * function (which is defined in the expression on the right side of the operator).
-    *
-    * Example:
-    *
-    * {{{
-    *   class MySplitUDTF extends TableFunction<String> {
-    *     public void eval(String str) {
-    *       str.split("#").forEach(this::collect);
-    *     }
-    *   }
-    *
-    *   TableFunction<String> split = new MySplitUDTF();
-    *   tableEnv.registerFunction("split", split);
-    *
-    *   table.join("split(c) as (s)").select("a, b, c, s");
-    * }}}
-    */
-  def join(udtf: String): Table = {
-    joinUdtfInternal(udtf, JoinType.INNER)
-  }
-
-  /**
-    * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
-    * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
-    * the rows from the outer table (table on the left of the operator), and rows that do not match
-    * the condition from the table function (which is defined in the expression on the right
-    * side of the operator). Rows with no matching condition are filled with null values.
-    *
-    * Example:
-    *
-    * {{{
-    *   class MySplitUDTF extends TableFunction[String] {
-    *     def eval(str: String): Unit = {
-    *       str.split("#").foreach(collect)
-    *     }
-    *   }
-    *
-    *   val split = new MySplitUDTF()
-    *   table.leftOuterJoin(split('c) as ('s)).select('a,'b,'c,'s)
-    * }}}
-    */
-  def leftOuterJoin(udtf: Expression): Table = {
-    joinUdtfInternal(udtf, JoinType.LEFT_OUTER)
-  }
-
-  /**
-    * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
-    * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
-    * the rows from the outer table (table on the left of the operator), and rows that do not match
-    * the condition from the table function (which is defined in the expression on the right
-    * side of the operator). Rows with no matching condition are filled with null values.
-    *
-    * Example:
-    *
-    * {{{
-    *   class MySplitUDTF extends TableFunction<String> {
-    *     public void eval(String str) {
-    *       str.split("#").forEach(this::collect);
-    *     }
-    *   }
-    *
-    *   TableFunction<String> split = new MySplitUDTF();
-    *   tableEnv.registerFunction("split", split);
-    *
-    *   table.leftOuterJoin("split(c) as (s)").select("a, b, c, s");
-    * }}}
-    */
-  def leftOuterJoin(udtf: String): Table = {
-    joinUdtfInternal(udtf, JoinType.LEFT_OUTER)
-  }
-
-  private def joinUdtfInternal(udtfString: String, joinType: JoinType): Table = {
-    val udtf = ExpressionParser.parseExpression(udtfString)
-    joinUdtfInternal(udtf, joinType)
-  }
-
-  private def joinUdtfInternal(udtf: Expression, joinType: JoinType): Table = {
-    var alias: Option[Seq[String]] = None
-
-    // unwrap an Expression until we get a TableFunctionCall
-    def unwrap(expr: Expression): TableFunctionCall = expr match {
-      case Alias(child, name, extraNames) =>
-        alias = Some(Seq(name) ++ extraNames)
-        unwrap(child)
-      case Call(name, args) =>
-        val function = tableEnv.getFunctionCatalog.lookupFunction(name, args)
-        unwrap(function)
-      case c: TableFunctionCall => c
-      case _ =>
-        throw new TableException(
-          "Cross/Outer Apply operators only accept expressions that define table functions.")
-    }
-
-    val call = unwrap(udtf)
-      .as(alias)
-      .toLogicalTableFunctionCall(this.logicalPlan)
-      .validate(tableEnv)
-
-    new Table(
-      tableEnv,
-      Join(this.logicalPlan, call, joinType, None, correlated = true).validate(tableEnv))
-  }
-
-  /**
-    * Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location.
-    *
-    * A batch [[Table]] can only be written to a
-    * [[org.apache.flink.api.table.sinks.BatchTableSink]], a streaming [[Table]] requires a
-    * [[org.apache.flink.api.table.sinks.StreamTableSink]].
-    *
-    * @param sink The [[TableSink]] to which the [[Table]] is written.
-    * @tparam T The data type that the [[TableSink]] expects.
-    */
-  def writeToSink[T](sink: TableSink[T]): Unit = {
-
-    // get schema information of table
-    val rowType = getRelNode.getRowType
-    val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
-    val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
-      .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
-
-    // configure the table sink
-    val configuredSink = sink.configure(fieldNames, fieldTypes)
-
-    // emit the table to the configured table sink
-    tableEnv.writeToSink(this, configuredSink)
-  }
-
-  /**
-    * Groups the records of a table by assigning them to windows defined by a time or row interval.
-    *
-    * For streaming tables of infinite size, grouping into windows is required to define finite
-    * groups on which group-based aggregates can be computed.
-    *
-    * For batch tables of finite size, windowing essentially provides shortcuts for time-based
-    * groupBy.
-    *
-    * __Note__: window on non-grouped streaming table is a non-parallel operation, i.e., all data
-    * will be processed by a single operator.
-    *
-    * @param groupWindow group-window that specifies how elements are grouped.
-    * @return A windowed table.
-    */
-  def window(groupWindow: GroupWindow): GroupWindowedTable = {
-    if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
-      throw new ValidationException(s"Windows on batch tables are currently not supported.")
-    }
-    new GroupWindowedTable(this, Seq(), groupWindow)
-  }
-}
-
-/**
-  * A table that has been grouped on a set of grouping keys.
-  */
-class GroupedTable(
-  private[flink] val table: Table,
-  private[flink] val groupKey: Seq[Expression]) {
-
-  /**
-    * Performs a selection operation on a grouped table. Similar to an SQL SELECT statement.
-    * The field expressions can contain complex expressions and aggregations.
-    *
-    * Example:
-    *
-    * {{{
-    *   tab.groupBy('key).select('key, 'value.avg + " The average" as 'average)
-    * }}}
-    */
-  def select(fields: Expression*): Table = {
-    val (aggNames, propNames) = extractAggregationsAndProperties(fields, table.tableEnv)
-    if (propNames.nonEmpty) {
-      throw ValidationException("Window properties can only be used on windowed tables.")
-    }
-
-    val projectsOnAgg = replaceAggregationsAndProperties(
-      fields, table.tableEnv, aggNames, propNames)
-    val projectFields = extractFieldReferences(fields ++ groupKey)
-
-    new Table(table.tableEnv,
-      Project(projectsOnAgg,
-        Aggregate(groupKey, aggNames.map(a => Alias(a._1, a._2)).toSeq,
-          Project(projectFields, table.logicalPlan).validate(table.tableEnv)
-        ).validate(table.tableEnv)
-      ).validate(table.tableEnv))
-  }
-
-  /**
-    * Performs a selection operation on a grouped table. Similar to an SQL SELECT statement.
-    * The field expressions can contain complex expressions and aggregations.
-    *
-    * Example:
-    *
-    * {{{
-    *   tab.groupBy("key").select("key, value.avg + ' The average' as average")
-    * }}}
-    */
-  def select(fields: String): Table = {
-    val fieldExprs = ExpressionParser.parseExpressionList(fields)
-    select(fieldExprs: _*)
-  }
-
-  /**
-    * Groups the records of a table by assigning them to windows defined by a time or row interval.
-    *
-    * For streaming tables of infinite size, grouping into windows is required to define finite
-    * groups on which group-based aggregates can be computed.
-    *
-    * For batch tables of finite size, windowing essentially provides shortcuts for time-based
-    * groupBy.
-    *
-    * @param groupWindow group-window that specifies how elements are grouped.
-    * @return A windowed table.
-    */
-  def window(groupWindow: GroupWindow): GroupWindowedTable = {
-    if (table.tableEnv.isInstanceOf[BatchTableEnvironment]) {
-      throw new ValidationException(s"Windows on batch tables are currently not supported.")
-    }
-    new GroupWindowedTable(table, groupKey, groupWindow)
-  }
-}
-
-class GroupWindowedTable(
-    private[flink] val table: Table,
-    private[flink] val groupKey: Seq[Expression],
-    private[flink] val window: GroupWindow) {
-
-  /**
-    * Performs a selection operation on a windowed table. Similar to an SQL SELECT statement.
-    * The field expressions can contain complex expressions and aggregations.
-    *
-    * Example:
-    *
-    * {{{
-    *   groupWindowTable.select('key, 'window.start, 'value.avg + " The average" as 'average)
-    * }}}
-    */
-  def select(fields: Expression*): Table = {
-    val (aggNames, propNames) = extractAggregationsAndProperties(fields, table.tableEnv)
-    val projectsOnAgg = replaceAggregationsAndProperties(
-      fields, table.tableEnv, aggNames, propNames)
-
-    val projectFields = (table.tableEnv, window) match {
-      // event time can be arbitrary field in batch environment
-      case (_: BatchTableEnvironment, w: EventTimeWindow) =>
-        extractFieldReferences(fields ++ groupKey ++ Seq(w.timeField))
-      case (_, _) =>
-        extractFieldReferences(fields ++ groupKey)
-    }
-
-    new Table(table.tableEnv,
-      Project(
-        projectsOnAgg,
-        WindowAggregate(
-          groupKey,
-          window.toLogicalWindow,
-          propNames.map(a => Alias(a._1, a._2)).toSeq,
-          aggNames.map(a => Alias(a._1, a._2)).toSeq,
-          Project(projectFields, table.logicalPlan).validate(table.tableEnv)
-        ).validate(table.tableEnv)
-      ).validate(table.tableEnv))
-  }
-
-  /**
-    * Performs a selection operation on a group-windows table. Similar to an SQL SELECT statement.
-    * The field expressions can contain complex expressions and aggregations.
-    *
-    * Example:
-    *
-    * {{{
-    *   groupWindowTable.select("key, window.start, value.avg + ' The average' as average")
-    * }}}
-    */
-  def select(fields: String): Table = {
-    val fieldExprs = ExpressionParser.parseExpressionList(fields)
-    select(fieldExprs: _*)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
deleted file mode 100644
index 8eecc74..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.trees
-
-import org.apache.commons.lang.ClassUtils
-
-/**
- * Generic base class for trees that can be transformed and traversed.
- */
-abstract class TreeNode[A <: TreeNode[A]] extends Product { self: A =>
-
-  /**
-   * List of child nodes that should be considered when doing transformations. Other values
-   * in the Product will not be transformed, only handed through.
-   */
-  private[flink] def children: Seq[A]
-
-  /**
-   * Tests for equality by first testing for reference equality.
-   */
-  private[flink] def fastEquals(other: TreeNode[_]): Boolean = this.eq(other) || this == other
-
-  /**
-    * Do tree transformation in post order.
-    */
-  private[flink] def postOrderTransform(rule: PartialFunction[A, A]): A = {
-    def childrenTransform(rule: PartialFunction[A, A]): A = {
-      var changed = false
-      val newArgs = productIterator.map {
-        case arg: TreeNode[_] if children.contains(arg) =>
-          val newChild = arg.asInstanceOf[A].postOrderTransform(rule)
-          if (!(newChild fastEquals arg)) {
-            changed = true
-            newChild
-          } else {
-            arg
-          }
-        case args: Traversable[_] => args.map {
-          case arg: TreeNode[_] if children.contains(arg) =>
-            val newChild = arg.asInstanceOf[A].postOrderTransform(rule)
-            if (!(newChild fastEquals arg)) {
-              changed = true
-              newChild
-            } else {
-              arg
-            }
-          case other => other
-        }
-        case nonChild: AnyRef => nonChild
-        case null => null
-      }.toArray
-      if (changed) makeCopy(newArgs) else this
-    }
-
-    val afterChildren = childrenTransform(rule)
-    if (afterChildren fastEquals this) {
-      rule.applyOrElse(this, identity[A])
-    } else {
-      rule.applyOrElse(afterChildren, identity[A])
-    }
-  }
-
-  /**
-    * Runs the given function first on the node and then recursively on all its children.
-    */
-  private[flink] def preOrderVisit(f: A => Unit): Unit = {
-    f(this)
-    children.foreach(_.preOrderVisit(f))
-  }
-
-  /**
-   * Creates a new copy of this expression with new children. This is used during transformation
-   * if children change.
-   */
-  private[flink] def makeCopy(newArgs: Array[AnyRef]): A = {
-    val ctors = getClass.getConstructors.filter(_.getParameterTypes.size > 0)
-    if (ctors.isEmpty) {
-      throw new RuntimeException(s"No valid constructor for ${getClass.getSimpleName}")
-    }
-
-    val defaultCtor = ctors.find { ctor =>
-      if (ctor.getParameterTypes.size != newArgs.length) {
-        false
-      } else if (newArgs.contains(null)) {
-        false
-      } else {
-        val argsClasses: Array[Class[_]] = newArgs.map(_.getClass)
-        ClassUtils.isAssignable(argsClasses, ctor.getParameterTypes)
-      }
-    }.getOrElse(ctors.maxBy(_.getParameterTypes.size))
-
-    try {
-      defaultCtor.newInstance(newArgs: _*).asInstanceOf[A]
-    } catch {
-      case e: Throwable =>
-        throw new RuntimeException(
-          s"Fail to copy treeNode ${getClass.getName}: ${e.getStackTraceString}")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/InternalTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/InternalTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/InternalTypeInfo.scala
deleted file mode 100644
index 2f896ec..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/InternalTypeInfo.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-import java.util.Objects
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
-import org.apache.flink.util.Preconditions._
-
-/**
-  * TypeInformation for internal types of the Table API that are for translation purposes only
-  * and should not be contained in final plan.
-  */
-@SerialVersionUID(-13064574364925255L)
-abstract class InternalTypeInfo[T](val clazz: Class[T])
-  extends TypeInformation[T]
-  with AtomicType[T] {
-
-  checkNotNull(clazz)
-
-  override def isBasicType: Boolean =
-    throw new UnsupportedOperationException("This type is for internal use only.")
-
-  override def isTupleType: Boolean =
-    throw new UnsupportedOperationException("This type is for internal use only.")
-
-  override def getArity: Int =
-    throw new UnsupportedOperationException("This type is for internal use only.")
-
-  override def getTotalFields: Int =
-    throw new UnsupportedOperationException("This type is for internal use only.")
-
-  override def getTypeClass: Class[T] = clazz
-
-  override def isKeyType: Boolean =
-    throw new UnsupportedOperationException("This type is for internal use only.")
-
-  override def createSerializer(config: ExecutionConfig): TypeSerializer[T] =
-    throw new UnsupportedOperationException("This type is for internal use only.")
-
-  override def createComparator(
-      sortOrderAscending: Boolean,
-      executionConfig: ExecutionConfig)
-    : TypeComparator[T] =
-    throw new UnsupportedOperationException("This type is for internal use only.")
-
-  // ----------------------------------------------------------------------------------------------
-
-  override def hashCode: Int = Objects.hash(clazz)
-
-  def canEqual(obj: Any): Boolean
-
-  override def equals(obj: Any): Boolean = {
-    obj match {
-      case other: InternalTypeInfo[_] =>
-        other.canEqual(this) && (this.clazz eq other.clazz)
-      case _ =>
-        false
-    }
-  }
-
-  override def toString: String = s"InternalTypeInfo"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala
deleted file mode 100644
index 4dc83d0..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-/**
-  * TypeInformation for row intervals.
-  */
-@SerialVersionUID(-1306179424364925258L)
-class RowIntervalTypeInfo extends InternalTypeInfo[Long](classOf[Long]) {
-
-  def canEqual(obj: Any): Boolean = obj.isInstanceOf[RowIntervalTypeInfo]
-
-  override def toString: String = s"RowIntervalTypeInfo"
-}
-
-object RowIntervalTypeInfo {
-
-  val INTERVAL_ROWS = new RowIntervalTypeInfo()
-
-}