You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:45 UTC

[16/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MinAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MinAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MinAggregate.scala
new file mode 100644
index 0000000..88cb058
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MinAggregate.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.types.Row
+
+abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
+
+  protected var minIndex: Int = _
+
+  /**
+   * Initiate the intermediate aggregate value in Row.
+   *
+   * @param intermediate The intermediate aggregate row to initiate.
+   */
+  override def initiate(intermediate: Row): Unit = {
+    intermediate.setField(minIndex, null)
+  }
+
+  /**
+   * Accessed in MapFunction, prepare the input of partial aggregate.
+   *
+   * @param value
+   * @param partial
+   */
+  override def prepare(value: Any, partial: Row): Unit = {
+    if (value == null) {
+      initiate(partial)
+    } else {
+      partial.setField(minIndex, value)
+    }
+  }
+
+  /**
+   * Accessed in CombineFunction and GroupReduceFunction, merge partial
+   * aggregate result into aggregate buffer.
+   *
+   * @param partial
+   * @param buffer
+   */
+  override def merge(partial: Row, buffer: Row): Unit = {
+    val partialValue = partial.getField(minIndex).asInstanceOf[T]
+    if (partialValue != null) {
+      val bufferValue = buffer.getField(minIndex).asInstanceOf[T]
+      if (bufferValue != null) {
+        val min : T = if (ord.compare(partialValue, bufferValue) < 0) partialValue else bufferValue
+        buffer.setField(minIndex, min)
+      } else {
+        buffer.setField(minIndex, partialValue)
+      }
+    }
+  }
+
+  /**
+   * Return the final aggregated result based on aggregate buffer.
+   *
+   * @param buffer
+   * @return
+   */
+  override def evaluate(buffer: Row): T = {
+    buffer.getField(minIndex).asInstanceOf[T]
+  }
+
+  override def supportPartial: Boolean = true
+
+  override def setAggOffsetInRow(aggOffset: Int): Unit = {
+    minIndex = aggOffset
+  }
+}
+
+class ByteMinAggregate extends MinAggregate[Byte] {
+
+  override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
+
+}
+
+class ShortMinAggregate extends MinAggregate[Short] {
+
+  override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO)
+
+}
+
+class IntMinAggregate extends MinAggregate[Int] {
+
+  override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO)
+
+}
+
+class LongMinAggregate extends MinAggregate[Long] {
+
+  override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
+
+}
+
+class FloatMinAggregate extends MinAggregate[Float] {
+
+  override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO)
+
+}
+
+class DoubleMinAggregate extends MinAggregate[Double] {
+
+  override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO)
+
+}
+
+class BooleanMinAggregate extends MinAggregate[Boolean] {
+
+  override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO)
+
+}
+
+class DecimalMinAggregate extends Aggregate[BigDecimal] {
+
+  protected var minIndex: Int = _
+
+  override def intermediateDataType = Array(BasicTypeInfo.BIG_DEC_TYPE_INFO)
+
+  override def initiate(intermediate: Row): Unit = {
+    intermediate.setField(minIndex, null)
+  }
+
+  override def prepare(value: Any, partial: Row): Unit = {
+    if (value == null) {
+      initiate(partial)
+    } else {
+      partial.setField(minIndex, value)
+    }
+  }
+
+  override def merge(partial: Row, buffer: Row): Unit = {
+    val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal]
+    if (partialValue != null) {
+      val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal]
+      if (bufferValue != null) {
+        val min = if (partialValue.compareTo(bufferValue) < 0) partialValue else bufferValue
+        buffer.setField(minIndex, min)
+      } else {
+        buffer.setField(minIndex, partialValue)
+      }
+    }
+  }
+
+  override def evaluate(buffer: Row): BigDecimal = {
+    buffer.getField(minIndex).asInstanceOf[BigDecimal]
+  }
+
+  override def supportPartial: Boolean = true
+
+  override def setAggOffsetInRow(aggOffset: Int): Unit = {
+    minIndex = aggOffset
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SumAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SumAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SumAggregate.scala
new file mode 100644
index 0000000..cd88112
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SumAggregate.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.types.Row
+
+abstract class SumAggregate[T: Numeric]
+  extends Aggregate[T] {
+
+  private val numeric = implicitly[Numeric[T]]
+  protected var sumIndex: Int = _
+
+  override def initiate(partial: Row): Unit = {
+    partial.setField(sumIndex, null)
+  }
+
+  override def merge(partial1: Row, buffer: Row): Unit = {
+    val partialValue = partial1.getField(sumIndex).asInstanceOf[T]
+    if (partialValue != null) {
+      val bufferValue = buffer.getField(sumIndex).asInstanceOf[T]
+      if (bufferValue != null) {
+        buffer.setField(sumIndex, numeric.plus(partialValue, bufferValue))
+      } else {
+        buffer.setField(sumIndex, partialValue)
+      }
+    }
+  }
+
+  override def evaluate(buffer: Row): T = {
+    buffer.getField(sumIndex).asInstanceOf[T]
+  }
+
+  override def prepare(value: Any, partial: Row): Unit = {
+    if (value == null) {
+      initiate(partial)
+    } else {
+      val input = value.asInstanceOf[T]
+      partial.setField(sumIndex, input)
+    }
+  }
+
+  override def supportPartial: Boolean = true
+
+  override def setAggOffsetInRow(aggOffset: Int): Unit = {
+    sumIndex = aggOffset
+  }
+}
+
+class ByteSumAggregate extends SumAggregate[Byte] {
+  override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
+}
+
+class ShortSumAggregate extends SumAggregate[Short] {
+  override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO)
+}
+
+class IntSumAggregate extends SumAggregate[Int] {
+  override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO)
+}
+
+class LongSumAggregate extends SumAggregate[Long] {
+  override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
+}
+
+class FloatSumAggregate extends SumAggregate[Float] {
+  override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO)
+}
+
+class DoubleSumAggregate extends SumAggregate[Double] {
+  override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO)
+}
+
+class DecimalSumAggregate extends Aggregate[BigDecimal] {
+
+  protected var sumIndex: Int = _
+
+  override def intermediateDataType = Array(BasicTypeInfo.BIG_DEC_TYPE_INFO)
+
+  override def initiate(partial: Row): Unit = {
+    partial.setField(sumIndex, null)
+  }
+
+  override def merge(partial1: Row, buffer: Row): Unit = {
+    val partialValue = partial1.getField(sumIndex).asInstanceOf[BigDecimal]
+    if (partialValue != null) {
+      val bufferValue = buffer.getField(sumIndex).asInstanceOf[BigDecimal]
+      if (bufferValue != null) {
+        buffer.setField(sumIndex, partialValue.add(bufferValue))
+      } else {
+        buffer.setField(sumIndex, partialValue)
+      }
+    }
+  }
+
+  override def evaluate(buffer: Row): BigDecimal = {
+    buffer.getField(sumIndex).asInstanceOf[BigDecimal]
+  }
+
+  override def prepare(value: Any, partial: Row): Unit = {
+    if (value == null) {
+      initiate(partial)
+    } else {
+      val input = value.asInstanceOf[BigDecimal]
+      partial.setField(sumIndex, input)
+    }
+  }
+
+  override def supportPartial: Boolean = true
+
+  override def setAggOffsetInRow(aggOffset: Int): Unit = {
+    sumIndex = aggOffset
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
new file mode 100644
index 0000000..d2ac454
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Adds TimeWindow properties to specified fields of a row before it emits the row to a wrapped
+  * collector.
+  */
+class TimeWindowPropertyCollector(windowStartOffset: Option[Int], windowEndOffset: Option[Int])
+    extends Collector[Row] {
+
+  var wrappedCollector: Collector[Row] = _
+  var timeWindow: TimeWindow = _
+
+  override def collect(record: Row): Unit = {
+
+    val lastFieldPos = record.getArity - 1
+
+    if (windowStartOffset.isDefined) {
+      record.setField(
+        lastFieldPos + windowStartOffset.get,
+        SqlFunctions.internalToTimestamp(timeWindow.getStart))
+    }
+    if (windowEndOffset.isDefined) {
+      record.setField(
+        lastFieldPos + windowEndOffset.get,
+        SqlFunctions.internalToTimestamp(timeWindow.getEnd))
+    }
+    wrappedCollector.collect(record)
+  }
+
+  override def close(): Unit = wrappedCollector.close()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
new file mode 100644
index 0000000..1a339e6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io
+
+import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.core.io.GenericInputSplit
+import org.slf4j.LoggerFactory
+
+class ValuesInputFormat[OUT](
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[OUT])
+  extends GenericInputFormat[OUT]
+  with NonParallelInput
+  with ResultTypeQueryable[OUT]
+  with Compiler[GenericInputFormat[OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var format: GenericInputFormat[OUT] = _
+
+  override def open(split: GenericInputSplit): Unit = {
+    LOG.debug(s"Compiling GenericInputFormat: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating GenericInputFormat.")
+    format = clazz.newInstance()
+  }
+
+  override def reachedEnd(): Boolean = format.reachedEnd()
+
+  override def nextRecord(reuse: OUT): OUT = format.nextRecord(reuse)
+
+  override def getProducedType: TypeInformation[OUT] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala
new file mode 100644
index 0000000..edfe113
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.Table
+
+/** Defines an external [[TableSink]] to emit a batch [[Table]].
+  *
+  * @tparam T Type of [[DataSet]] that this [[TableSink]] expects and supports.
+  */
+trait BatchTableSink[T] extends TableSink[T] {
+
+  /** Emits the DataSet. */
+  def emitDataSet(dataSet: DataSet[T]): Unit
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
new file mode 100644
index 0000000..9cf76dd
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.types.Row
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+
+/**
+  * A simple [[TableSink]] to emit data as CSV files.
+  *
+  * @param path The output path to write the Table to.
+  * @param fieldDelim The field delimiter, ',' by default.
+  */
+class CsvTableSink(
+    path: String,
+    fieldDelim: String = ",")
+  extends TableSinkBase[Row] with BatchTableSink[Row] with StreamTableSink[Row] {
+
+  override def emitDataSet(dataSet: DataSet[Row]): Unit = {
+    dataSet
+      .map(new CsvFormatter(fieldDelim))
+      .writeAsText(path)
+  }
+
+  override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+    dataStream
+      .map(new CsvFormatter(fieldDelim))
+      .writeAsText(path)
+  }
+
+  override protected def copy: TableSinkBase[Row] = {
+    new CsvTableSink(path, fieldDelim)
+  }
+
+  override def getOutputType: TypeInformation[Row] = {
+    new RowTypeInfo(getFieldTypes: _*)
+  }
+}
+
+/**
+  * Formats a [[Row]] into a [[String]] with fields separated by the field delimiter.
+  *
+  * @param fieldDelim The field delimiter.
+  */
+class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
+  override def map(row: Row): String = {
+
+    val builder = new StringBuilder
+
+    // write first value
+    val v = row.getField(0)
+    if (v != null) {
+      builder.append(v.toString)
+    }
+
+    // write following values
+    for (i <- 1 until row.getArity) {
+      builder.append(fieldDelim)
+      val v = row.getField(i)
+      if (v != null) {
+        builder.append(v.toString)
+      }
+    }
+    builder.mkString
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala
new file mode 100644
index 0000000..360252e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.Table
+
+/** Defines an external [[TableSink]] to emit a batch [[Table]].
+  *
+  * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and supports.
+  */
+trait StreamTableSink[T] extends TableSink[T] {
+
+  /** Emits the DataStream. */
+  def emitDataStream(dataStream: DataStream[T]): Unit
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/TableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/TableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/TableSink.scala
new file mode 100644
index 0000000..8304867
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/TableSink.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.Table
+
+/** A [[TableSink]] specifies how to emit a [[Table]] to an external
+  * system or location.
+  *
+  * The interface is generic such that it can support different storage locations and formats.
+  *
+  * @tparam T The return type of the [[TableSink]].
+  */
+trait TableSink[T] {
+
+  /**
+    * Return the type expected by this [[TableSink]].
+    *
+    * This type should depend on the types returned by [[getFieldNames]].
+    *
+    * @return The type expected by this [[TableSink]].
+    */
+  def getOutputType: TypeInformation[T]
+
+  /** Returns the names of the table fields. */
+  def getFieldNames: Array[String]
+
+  /** Returns the types of the table fields. */
+  def getFieldTypes: Array[TypeInformation[_]]
+
+  /**
+    * Return a copy of this [[TableSink]] configured with the field names and types of the
+    * [[Table]] to emit.
+    *
+    * @param fieldNames The field names of the table to emit.
+    * @param fieldTypes The field types of the table to emit.
+    * @return A copy of this [[TableSink]] configured with the field names and types of the
+    *         [[Table]] to emit.
+    */
+  def configure(fieldNames: Array[String],
+                fieldTypes: Array[TypeInformation[_]]): TableSink[T]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/TableSinkBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/TableSinkBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/TableSinkBase.scala
new file mode 100644
index 0000000..45866ca
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/TableSinkBase.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.sinks
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.Table
+
+trait TableSinkBase[T] extends TableSink[T] {
+
+  private var fieldNames: Option[Array[String]] = None
+  private var fieldTypes: Option[Array[TypeInformation[_]]] = None
+
+  /** Return a deep copy of the [[TableSink]]. */
+  protected def copy: TableSinkBase[T]
+
+  /**
+    * Return the field names of the [[Table]] to emit. */
+  def getFieldNames: Array[String] = {
+    fieldNames match {
+      case Some(n) => n
+      case None => throw new IllegalStateException(
+        "TableSink must be configured to retrieve field names.")
+    }
+  }
+
+  /** Return the field types of the [[Table]] to emit. */
+  def getFieldTypes: Array[TypeInformation[_]] = {
+    fieldTypes match {
+      case Some(t) => t
+      case None => throw new IllegalStateException(
+        "TableSink must be configured to retrieve field types.")
+    }
+  }
+
+  /**
+    * Return a copy of this [[TableSink]] configured with the field names and types of the
+    * [[Table]] to emit.
+    *
+    * @param fieldNames The field names of the table to emit.
+    * @param fieldTypes The field types of the table to emit.
+    * @return A copy of this [[TableSink]] configured with the field names and types of the
+    *         [[Table]] to emit.
+    */
+  final def configure(fieldNames: Array[String],
+                      fieldTypes: Array[TypeInformation[_]]): TableSink[T] = {
+
+    val configuredSink = this.copy
+    configuredSink.fieldNames = Some(fieldNames)
+    configuredSink.fieldTypes = Some(fieldTypes)
+
+    configuredSink
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala
new file mode 100644
index 0000000..0478dc9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
+
+/** Defines an external batch table and provides access to its data.
+  *
+  * @tparam T Type of the [[DataSet]] created by this [[TableSource]].
+  */
+trait BatchTableSource[T] extends TableSource[T] {
+
+  /**
+    * Returns the data of the table as a [[DataSet]].
+    *
+    * NOTE: This method is for internal use only for defining a [[TableSource]].
+    *       Do not use it in Table API programs.
+    */
+  def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
new file mode 100644
index 0000000..3f4e395
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.io.CsvInputFormat
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.types.Row
+import org.apache.flink.api.java.io.RowCsvInputFormat
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.core.fs.Path
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableException
+
+/**
+  * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
+  * (logically) unlimited number of fields.
+  *
+  * @param path The path to the CSV file.
+  * @param fieldNames The names of the table fields.
+  * @param fieldTypes The types of the table fields.
+  * @param fieldDelim The field delimiter, "," by default.
+  * @param rowDelim The row delimiter, "\n" by default.
+  * @param quoteCharacter An optional quote character for String values, null by default.
+  * @param ignoreFirstLine Flag to ignore the first line, false by default.
+  * @param ignoreComments An optional prefix to indicate comments, null by default.
+  * @param lenient Flag to skip records with parse error instead to fail, false by default.
+  */
+class CsvTableSource(
+    path: String,
+    fieldNames: Array[String],
+    fieldTypes: Array[TypeInformation[_]],
+    fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
+    rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
+    quoteCharacter: Character = null,
+    ignoreFirstLine: Boolean = false,
+    ignoreComments: String = null,
+    lenient: Boolean = false)
+  extends BatchTableSource[Row]
+  with StreamTableSource[Row] {
+
+  /**
+    * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
+    * (logically) unlimited number of fields.
+    *
+    * @param path The path to the CSV file.
+    * @param fieldNames The names of the table fields.
+    * @param fieldTypes The types of the table fields.
+    */
+  def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) =
+    this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
+      CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
+
+  if (fieldNames.length != fieldTypes.length) {
+    throw TableException("Number of field names and field types must be equal.")
+  }
+
+  private val returnType = new RowTypeInfo(fieldTypes: _*)
+
+  /**
+    * Returns the data of the table as a [[DataSet]] of [[Row]].
+    *
+    * NOTE: This method is for internal use only for defining a [[TableSource]].
+    *       Do not use it in Table API programs.
+    */
+  override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
+    execEnv.createInput(createCsvInput(), returnType)
+  }
+
+  /** Returns the types of the table fields. */
+  override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
+
+  /** Returns the names of the table fields. */
+  override def getFieldsNames: Array[String] = fieldNames
+
+  /** Returns the number of fields of the table. */
+  override def getNumberOfFields: Int = fieldNames.length
+
+  /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */
+  override def getReturnType: RowTypeInfo = returnType
+
+  /**
+    * Returns the data of the table as a [[DataStream]] of [[Row]].
+    *
+    * NOTE: This method is for internal use only for defining a [[TableSource]].
+    *       Do not use it in Table API programs.
+    */
+  override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = {
+    streamExecEnv.createInput(createCsvInput(), returnType)
+  }
+
+  private def createCsvInput(): RowCsvInputFormat = {
+    val inputFormat = new RowCsvInputFormat(new Path(path), returnType, rowDelim, fieldDelim)
+
+    inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
+    inputFormat.setLenient(lenient)
+    if (quoteCharacter != null) {
+      inputFormat.enableQuotedStringParsing(quoteCharacter)
+    }
+    if (ignoreComments != null) {
+      inputFormat.setCommentPrefix(ignoreComments)
+    }
+
+    inputFormat
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala
new file mode 100644
index 0000000..429cccb
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+/**
+  * Adds support for projection push-down to a [[TableSource]].
+  * A [[TableSource]] extending this interface is able to project the fields of the return table.
+  *
+  * @tparam T The return type of the [[ProjectableTableSource]].
+  */
+trait ProjectableTableSource[T] {
+
+  /**
+    * Creates a copy of the [[ProjectableTableSource]] that projects its output on the specified
+    * fields.
+    *
+    * @param fields The indexes of the fields to return.
+    * @return A copy of the [[ProjectableTableSource]] that projects its output.
+    */
+  def projectFields(fields: Array[Int]): ProjectableTableSource[T]
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala
new file mode 100644
index 0000000..7a2737c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+
+/** Defines an external stream table and provides access to its data.
+  *
+  * @tparam T Type of the [[DataStream]] created by this [[TableSource]].
+  */
+trait StreamTableSource[T] extends TableSource[T] {
+
+  /**
+    * Returns the data of the table as a [[DataStream]].
+    *
+    * NOTE: This method is for internal use only for defining a [[TableSource]].
+    *       Do not use it in Table API programs.
+    */
+  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
new file mode 100644
index 0000000..9d4ba68
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+/** Defines an external table by providing schema information, i.e., field names and types.
+  *
+  * @tparam T The return type of the [[TableSource]].
+  */
+trait TableSource[T] {
+
+  /** Returns the number of fields of the table. */
+  def getNumberOfFields: Int
+
+  /** Returns the names of the table fields. */
+  def getFieldsNames: Array[String]
+
+  /** Returns the types of the table fields. */
+  def getFieldTypes: Array[TypeInformation[_]]
+
+  /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */
+  def getReturnType: TypeInformation[T]
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/InternalTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/InternalTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/InternalTypeInfo.scala
new file mode 100644
index 0000000..b4af152
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/InternalTypeInfo.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import java.util.Objects
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+import org.apache.flink.util.Preconditions._
+
+/**
+  * TypeInformation for internal types of the Table API that are for translation purposes only
+  * and should not be contained in final plan.
+  */
+@SerialVersionUID(-13064574364925255L)
+abstract class InternalTypeInfo[T](val clazz: Class[T])
+  extends TypeInformation[T]
+  with AtomicType[T] {
+
+  checkNotNull(clazz)
+
+  override def isBasicType: Boolean =
+    throw new UnsupportedOperationException("This type is for internal use only.")
+
+  override def isTupleType: Boolean =
+    throw new UnsupportedOperationException("This type is for internal use only.")
+
+  override def getArity: Int =
+    throw new UnsupportedOperationException("This type is for internal use only.")
+
+  override def getTotalFields: Int =
+    throw new UnsupportedOperationException("This type is for internal use only.")
+
+  override def getTypeClass: Class[T] = clazz
+
+  override def isKeyType: Boolean =
+    throw new UnsupportedOperationException("This type is for internal use only.")
+
+  override def createSerializer(config: ExecutionConfig): TypeSerializer[T] =
+    throw new UnsupportedOperationException("This type is for internal use only.")
+
+  override def createComparator(
+      sortOrderAscending: Boolean,
+      executionConfig: ExecutionConfig)
+    : TypeComparator[T] =
+    throw new UnsupportedOperationException("This type is for internal use only.")
+
+  // ----------------------------------------------------------------------------------------------
+
+  override def hashCode: Int = Objects.hash(clazz)
+
+  def canEqual(obj: Any): Boolean
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case other: InternalTypeInfo[_] =>
+        other.canEqual(this) && (this.clazz eq other.clazz)
+      case _ =>
+        false
+    }
+  }
+
+  override def toString: String = s"InternalTypeInfo"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/RowIntervalTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/RowIntervalTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/RowIntervalTypeInfo.scala
new file mode 100644
index 0000000..bbc20aa
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/RowIntervalTypeInfo.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+/**
+  * TypeInformation for row intervals.
+  */
+@SerialVersionUID(-1306179424364925258L)
+class RowIntervalTypeInfo extends InternalTypeInfo[Long](classOf[Long]) {
+
+  def canEqual(obj: Any): Boolean = obj.isInstanceOf[RowIntervalTypeInfo]
+
+  override def toString: String = s"RowIntervalTypeInfo"
+}
+
+object RowIntervalTypeInfo {
+
+  val INTERVAL_ROWS = new RowIntervalTypeInfo()
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIntervalTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIntervalTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIntervalTypeInfo.scala
new file mode 100644
index 0000000..9d76050
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIntervalTypeInfo.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import java.util.Objects
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.base.{IntComparator, IntSerializer, LongComparator, LongSerializer}
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo.instantiateComparator
+import org.apache.flink.util.Preconditions._
+
+/**
+  * TypeInformation for SQL INTERVAL types.
+  */
+@SerialVersionUID(-1816179424364825258L)
+class TimeIntervalTypeInfo[T](
+    val clazz: Class[T],
+    val serializer: TypeSerializer[T],
+    val comparatorClass: Class[_ <: TypeComparator[T]])
+  extends TypeInformation[T]
+  with AtomicType[T] {
+
+  checkNotNull(clazz)
+  checkNotNull(serializer)
+  checkNotNull(comparatorClass)
+
+  override def isBasicType: Boolean = false
+
+  override def isTupleType: Boolean = false
+
+  override def getArity: Int = 1
+
+  override def getTotalFields: Int = 1
+
+  override def getTypeClass: Class[T] = clazz
+
+  override def isKeyType: Boolean = true
+
+  override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = serializer
+
+  override def createComparator(
+      sortOrderAscending: Boolean,
+      executionConfig: ExecutionConfig)
+    : TypeComparator[T] = instantiateComparator(comparatorClass, sortOrderAscending)
+
+  // ----------------------------------------------------------------------------------------------
+
+  override def hashCode: Int = Objects.hash(clazz, serializer, comparatorClass)
+
+  def canEqual(obj: Any): Boolean = obj.isInstanceOf[TimeIntervalTypeInfo[_]]
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case other: TimeIntervalTypeInfo[_] =>
+        other.canEqual(this) &&
+          (this.clazz eq other.clazz) &&
+          serializer == other.serializer &&
+          (this.comparatorClass eq other.comparatorClass)
+      case _ =>
+        false
+    }
+  }
+
+  override def toString: String = s"TimeIntervalTypeInfo(${clazz.getSimpleName})"
+}
+
+object TimeIntervalTypeInfo {
+
+  val INTERVAL_MONTHS = new TimeIntervalTypeInfo(
+    classOf[java.lang.Integer],
+    IntSerializer.INSTANCE,
+    classOf[IntComparator])
+
+  val INTERVAL_MILLIS = new TimeIntervalTypeInfo(
+    classOf[java.lang.Long],
+    LongSerializer.INSTANCE,
+    classOf[LongComparator])
+
+  // ----------------------------------------------------------------------------------------------
+
+  private def instantiateComparator[X](
+      comparatorClass: Class[_ <: TypeComparator[X]],
+      ascendingOrder: java.lang.Boolean)
+    : TypeComparator[X] = {
+    try {
+      val constructor = comparatorClass.getConstructor(java.lang.Boolean.TYPE)
+      constructor.newInstance(ascendingOrder)
+    } catch {
+      case e: Exception =>
+        throw new RuntimeException(
+          s"Could not initialize comparator ${comparatorClass.getName}", e)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
new file mode 100644
index 0000000..40f0cf2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.typeutils
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{BIG_DEC_TYPE_INFO, BOOLEAN_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO}
+import org.apache.flink.api.common.typeinfo._
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.table.validate._
+
+object TypeCheckUtils {
+
+  /**
+    * Checks if type information is an advanced type that can be converted to a
+    * SQL type but NOT vice versa.
+    */
+  def isAdvanced(dataType: TypeInformation[_]): Boolean = dataType match {
+    case _: BasicTypeInfo[_] => false
+    case _: SqlTimeTypeInfo[_] => false
+    case _: TimeIntervalTypeInfo[_] => false
+    case _ => true
+  }
+
+  /**
+    * Checks if type information is a simple type that can be converted to a
+    * SQL type and vice versa.
+    */
+  def isSimple(dataType: TypeInformation[_]): Boolean = !isAdvanced(dataType)
+
+  def isNumeric(dataType: TypeInformation[_]): Boolean = dataType match {
+    case _: NumericTypeInfo[_] => true
+    case BIG_DEC_TYPE_INFO => true
+    case _ => false
+  }
+
+  def isTemporal(dataType: TypeInformation[_]): Boolean =
+    isTimePoint(dataType) || isTimeInterval(dataType)
+
+  def isTimePoint(dataType: TypeInformation[_]): Boolean =
+    dataType.isInstanceOf[SqlTimeTypeInfo[_]]
+
+  def isTimeInterval(dataType: TypeInformation[_]): Boolean =
+    dataType.isInstanceOf[TimeIntervalTypeInfo[_]]
+
+  def isString(dataType: TypeInformation[_]): Boolean = dataType == STRING_TYPE_INFO
+
+  def isBoolean(dataType: TypeInformation[_]): Boolean = dataType == BOOLEAN_TYPE_INFO
+
+  def isDecimal(dataType: TypeInformation[_]): Boolean = dataType == BIG_DEC_TYPE_INFO
+
+  def isInteger(dataType: TypeInformation[_]): Boolean = dataType == INT_TYPE_INFO
+
+  def isArray(dataType: TypeInformation[_]): Boolean = dataType match {
+    case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => true
+    case _ => false
+  }
+
+  def isComparable(dataType: TypeInformation[_]): Boolean =
+    classOf[Comparable[_]].isAssignableFrom(dataType.getTypeClass) && !isArray(dataType)
+
+  def assertNumericExpr(
+      dataType: TypeInformation[_],
+      caller: String)
+    : ValidationResult = dataType match {
+    case _: NumericTypeInfo[_] =>
+      ValidationSuccess
+    case BIG_DEC_TYPE_INFO =>
+      ValidationSuccess
+    case _ =>
+      ValidationFailure(s"$caller requires numeric types, get $dataType here")
+  }
+
+  def assertOrderableExpr(dataType: TypeInformation[_], caller: String): ValidationResult = {
+    if (dataType.isSortKeyType) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"$caller requires orderable types, get $dataType here")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala
new file mode 100644
index 0000000..99e94b3
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, NumericTypeInfo, TypeInformation}
+
+/**
+  * Utilities for type conversions.
+  */
+object TypeCoercion {
+
+  val numericWideningPrecedence: IndexedSeq[TypeInformation[_]] =
+    IndexedSeq(
+      BYTE_TYPE_INFO,
+      SHORT_TYPE_INFO,
+      INT_TYPE_INFO,
+      LONG_TYPE_INFO,
+      FLOAT_TYPE_INFO,
+      DOUBLE_TYPE_INFO)
+
+  def widerTypeOf(tp1: TypeInformation[_], tp2: TypeInformation[_]): Option[TypeInformation[_]] = {
+    (tp1, tp2) match {
+      case (ti1, ti2) if ti1 == ti2 => Some(ti1)
+
+      case (_, STRING_TYPE_INFO) => Some(STRING_TYPE_INFO)
+      case (STRING_TYPE_INFO, _) => Some(STRING_TYPE_INFO)
+
+      case (_, BIG_DEC_TYPE_INFO) => Some(BIG_DEC_TYPE_INFO)
+      case (BIG_DEC_TYPE_INFO, _) => Some(BIG_DEC_TYPE_INFO)
+
+      case (stti: SqlTimeTypeInfo[_], _: TimeIntervalTypeInfo[_]) => Some(stti)
+      case (_: TimeIntervalTypeInfo[_], stti: SqlTimeTypeInfo[_]) => Some(stti)
+
+      case tuple if tuple.productIterator.forall(numericWideningPrecedence.contains) =>
+        val higherIndex = numericWideningPrecedence.lastIndexWhere(t => t == tp1 || t == tp2)
+        Some(numericWideningPrecedence(higherIndex))
+
+      case _ => None
+    }
+  }
+
+  /**
+    * Test if we can do cast safely without lose of information.
+    */
+  def canSafelyCast(from: TypeInformation[_], to: TypeInformation[_]): Boolean = (from, to) match {
+    case (_, STRING_TYPE_INFO) => true
+
+    case (_: NumericTypeInfo[_], BIG_DEC_TYPE_INFO) => true
+
+    case tuple if tuple.productIterator.forall(numericWideningPrecedence.contains) =>
+      if (numericWideningPrecedence.indexOf(from) < numericWideningPrecedence.indexOf(to)) {
+        true
+      } else {
+        false
+      }
+
+    case _ => false
+  }
+
+  /**
+    * All the supported cast types in flink-table.
+    * Note: This may lose information during the cast.
+    */
+  def canCast(from: TypeInformation[_], to: TypeInformation[_]): Boolean = (from, to) match {
+    case (fromTp, toTp) if fromTp == toTp => true
+
+    case (_, STRING_TYPE_INFO) => true
+
+    case (_, CHAR_TYPE_INFO) => false // Character type not supported.
+
+    case (STRING_TYPE_INFO, _: NumericTypeInfo[_]) => true
+    case (STRING_TYPE_INFO, BOOLEAN_TYPE_INFO) => true
+    case (STRING_TYPE_INFO, BIG_DEC_TYPE_INFO) => true
+    case (STRING_TYPE_INFO, SqlTimeTypeInfo.DATE) => true
+    case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIME) => true
+    case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) => true
+
+    case (BOOLEAN_TYPE_INFO, _: NumericTypeInfo[_]) => true
+    case (BOOLEAN_TYPE_INFO, BIG_DEC_TYPE_INFO) => true
+    case (_: NumericTypeInfo[_], BOOLEAN_TYPE_INFO) => true
+    case (BIG_DEC_TYPE_INFO, BOOLEAN_TYPE_INFO) => true
+
+    case (_: NumericTypeInfo[_], _: NumericTypeInfo[_]) => true
+    case (BIG_DEC_TYPE_INFO, _: NumericTypeInfo[_]) => true
+    case (_: NumericTypeInfo[_], BIG_DEC_TYPE_INFO) => true
+    case (INT_TYPE_INFO, SqlTimeTypeInfo.DATE) => true
+    case (INT_TYPE_INFO, SqlTimeTypeInfo.TIME) => true
+    case (LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) => true
+    case (INT_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MONTHS) => true
+    case (LONG_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MILLIS) => true
+
+    case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIME) => false
+    case (SqlTimeTypeInfo.TIME, SqlTimeTypeInfo.DATE) => false
+    case (_: SqlTimeTypeInfo[_], _: SqlTimeTypeInfo[_]) => true
+    case (SqlTimeTypeInfo.DATE, INT_TYPE_INFO) => true
+    case (SqlTimeTypeInfo.TIME, INT_TYPE_INFO) => true
+    case (SqlTimeTypeInfo.TIMESTAMP, LONG_TYPE_INFO) => true
+
+    case (TimeIntervalTypeInfo.INTERVAL_MONTHS, INT_TYPE_INFO) => true
+    case (TimeIntervalTypeInfo.INTERVAL_MILLIS, LONG_TYPE_INFO) => true
+
+    case _ => false
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala
new file mode 100644
index 0000000..a2a120b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.core.JoinRelType._
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+
+import scala.collection.JavaConversions._
+
+object TypeConverter {
+
+  val DEFAULT_ROW_TYPE = new RowTypeInfo().asInstanceOf[TypeInformation[Any]]
+
+  /**
+    * Determines the return type of Flink operators based on the logical fields, the expected
+    * physical type and configuration parameters.
+    *
+    * For example:
+    *   - No physical type expected, only 3 non-null fields and efficient type usage enabled
+    *       -> return Tuple3
+    *   - No physical type expected, efficient type usage enabled, but 3 nullable fields
+    *       -> return Row because Tuple does not support null values
+    *   - Physical type expected
+    *       -> check if physical type is compatible and return it
+    *
+    * @param logicalRowType logical row information
+    * @param expectedPhysicalType expected physical type
+    * @param nullable fields can be nullable
+    * @param useEfficientTypes use the most efficient types (e.g. Tuples and value types)
+    * @return suitable return type
+    */
+  def determineReturnType(
+      logicalRowType: RelDataType,
+      expectedPhysicalType: Option[TypeInformation[Any]],
+      nullable: Boolean,
+      useEfficientTypes: Boolean)
+    : TypeInformation[Any] = {
+    // convert to type information
+    val logicalFieldTypes = logicalRowType.getFieldList map { relDataType =>
+      FlinkTypeFactory.toTypeInfo(relDataType.getType)
+    }
+    // field names
+    val logicalFieldNames = logicalRowType.getFieldNames.toList
+
+    val returnType = expectedPhysicalType match {
+      // a certain physical type is expected (but not Row)
+      // check if expected physical type is compatible with logical field type
+      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
+        if (typeInfo.getArity != logicalFieldTypes.length) {
+          throw new TableException("Arity of result does not match expected type.")
+        }
+        typeInfo match {
+
+          // POJO type expected
+          case pt: PojoTypeInfo[_] =>
+            logicalFieldNames.zip(logicalFieldTypes) foreach {
+              case (fName, fType) =>
+                val pojoIdx = pt.getFieldIndex(fName)
+                if (pojoIdx < 0) {
+                  throw new TableException(s"POJO does not define field name: $fName")
+                }
+                val expectedTypeInfo = pt.getTypeAt(pojoIdx)
+                if (fType != expectedTypeInfo) {
+                  throw new TableException(s"Result field does not match expected type. " +
+                    s"Expected: $expectedTypeInfo; Actual: $fType")
+                }
+            }
+
+          // Tuple/Case class type expected
+          case ct: CompositeType[_] =>
+            logicalFieldTypes.zipWithIndex foreach {
+              case (fieldTypeInfo, i) =>
+                val expectedTypeInfo = ct.getTypeAt(i)
+                if (fieldTypeInfo != expectedTypeInfo) {
+                  throw new TableException(s"Result field does not match expected type. " +
+                    s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo")
+                }
+            }
+
+          // Atomic type expected
+          case at: AtomicType[_] =>
+            val fieldTypeInfo = logicalFieldTypes.head
+            if (fieldTypeInfo != at) {
+              throw new TableException(s"Result field does not match expected type. " +
+                s"Expected: $at; Actual: $fieldTypeInfo")
+            }
+
+          case _ =>
+            throw new TableException("Unsupported result type.")
+        }
+        typeInfo
+
+      // Row is expected, create the arity for it
+      case Some(typeInfo) if typeInfo.getTypeClass == classOf[Row] =>
+        new RowTypeInfo(logicalFieldTypes: _*)
+
+      // no physical type
+      // determine type based on logical fields and configuration parameters
+      case None =>
+        // no need for efficient types -> use Row
+        // we cannot use efficient types if row arity > tuple arity or nullable
+        if (!useEfficientTypes || logicalFieldTypes.length > Tuple.MAX_ARITY || nullable) {
+          new RowTypeInfo(logicalFieldTypes: _*)
+        }
+        // use efficient type tuple or atomic type
+        else {
+          if (logicalFieldTypes.length == 1) {
+            logicalFieldTypes.head
+          }
+          else {
+            new TupleTypeInfo[Tuple](logicalFieldTypes.toArray:_*)
+          }
+        }
+    }
+    returnType.asInstanceOf[TypeInformation[Any]]
+  }
+
+  def sqlJoinTypeToFlinkJoinType(sqlJoinType: JoinRelType): JoinType = sqlJoinType match {
+    case INNER => JoinType.INNER
+    case LEFT => JoinType.LEFT_OUTER
+    case RIGHT => JoinType.RIGHT_OUTER
+    case FULL => JoinType.FULL_OUTER
+  }
+
+  def flinkJoinTypeToRelType(joinType: JoinType) = joinType match {
+    case JoinType.INNER => JoinRelType.INNER
+    case JoinType.LEFT_OUTER => JoinRelType.LEFT
+    case JoinType.RIGHT_OUTER => JoinRelType.RIGHT
+    case JoinType.FULL_OUTER => JoinRelType.FULL
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
new file mode 100644
index 0000000..f92b3a1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.validate
+
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable, ReflectiveSqlOperatorTable}
+import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable}
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
+import org.apache.flink.table.functions.utils.{TableSqlFunction, UserDefinedFunctionUtils}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+/**
+  * A catalog for looking up (user-defined) functions, used during validation phases
+  * of both Table API and SQL API.
+  */
+class FunctionCatalog {
+
+  private val functionBuilders = mutable.HashMap.empty[String, Class[_]]
+  private val sqlFunctions = mutable.ListBuffer[SqlFunction]()
+
+  def registerFunction(name: String, builder: Class[_]): Unit =
+    functionBuilders.put(name.toLowerCase, builder)
+
+  def registerSqlFunction(sqlFunction: SqlFunction): Unit = {
+    sqlFunctions --= sqlFunctions.filter(_.getName == sqlFunction.getName)
+    sqlFunctions += sqlFunction
+  }
+
+  /**
+    * Register multiple SQL functions at the same time. The functions have the same name.
+    */
+  def registerSqlFunctions(functions: Seq[SqlFunction]): Unit = {
+    if (functions.nonEmpty) {
+      val name = functions.head.getName
+      // check that all functions have the same name
+      if (functions.forall(_.getName == name)) {
+        sqlFunctions --= sqlFunctions.filter(_.getName == name)
+        sqlFunctions ++= functions
+      } else {
+        throw ValidationException("The SQL functions to be registered have different names.")
+      }
+    }
+  }
+
+  def getSqlOperatorTable: SqlOperatorTable =
+    ChainedSqlOperatorTable.of(
+      new BasicOperatorTable(),
+      new ListSqlOperatorTable(sqlFunctions)
+    )
+
+  /**
+    * Lookup and create an expression if we find a match.
+    */
+  def lookupFunction(name: String, children: Seq[Expression]): Expression = {
+    val funcClass = functionBuilders
+      .getOrElse(name.toLowerCase, throw ValidationException(s"Undefined function: $name"))
+
+    // Instantiate a function using the provided `children`
+    funcClass match {
+
+      // user-defined scalar function call
+      case sf if classOf[ScalarFunction].isAssignableFrom(sf) =>
+        Try(UserDefinedFunctionUtils.instantiate(sf.asInstanceOf[Class[ScalarFunction]])) match {
+          case Success(scalarFunction) => ScalarFunctionCall(scalarFunction, children)
+          case Failure(e) => throw ValidationException(e.getMessage)
+        }
+
+      // user-defined table function call
+      case tf if classOf[TableFunction[_]].isAssignableFrom(tf) =>
+        val tableSqlFunction = sqlFunctions
+          .find(f => f.getName.equalsIgnoreCase(name) && f.isInstanceOf[TableSqlFunction])
+          .getOrElse(throw ValidationException(s"Undefined table function: $name"))
+          .asInstanceOf[TableSqlFunction]
+        val typeInfo = tableSqlFunction.getRowTypeInfo
+        val function = tableSqlFunction.getTableFunction
+        TableFunctionCall(name, function, children, typeInfo)
+
+      // general expression call
+      case expression if classOf[Expression].isAssignableFrom(expression) =>
+        // try to find a constructor accepts `Seq[Expression]`
+        Try(funcClass.getDeclaredConstructor(classOf[Seq[_]])) match {
+          case Success(seqCtor) =>
+            Try(seqCtor.newInstance(children).asInstanceOf[Expression]) match {
+              case Success(expr) => expr
+              case Failure(e) => throw new ValidationException(e.getMessage)
+            }
+          case Failure(e) =>
+            val childrenClass = Seq.fill(children.length)(classOf[Expression])
+            // try to find a constructor matching the exact number of children
+            Try(funcClass.getDeclaredConstructor(childrenClass: _*)) match {
+              case Success(ctor) =>
+                Try(ctor.newInstance(children: _*).asInstanceOf[Expression]) match {
+                  case Success(expr) => expr
+                  case Failure(exception) => throw ValidationException(exception.getMessage)
+                }
+              case Failure(exception) =>
+                throw ValidationException(s"Invalid number of arguments for function $funcClass")
+            }
+        }
+
+      case _ =>
+        throw ValidationException("Unsupported function.")
+    }
+  }
+
+  /**
+    * Drop a function and return if the function existed.
+    */
+  def dropFunction(name: String): Boolean =
+    functionBuilders.remove(name.toLowerCase).isDefined
+
+  /**
+    * Drop all registered functions.
+    */
+  def clear(): Unit = functionBuilders.clear()
+}
+
+object FunctionCatalog {
+
+  val builtInFunctions: Map[String, Class[_]] = Map(
+    // logic
+    "isNull" -> classOf[IsNull],
+    "isNotNull" -> classOf[IsNotNull],
+    "isTrue" -> classOf[IsTrue],
+    "isFalse" -> classOf[IsFalse],
+    "isNotTrue" -> classOf[IsNotTrue],
+    "isNotFalse" -> classOf[IsNotFalse],
+
+    // aggregate functions
+    "avg" -> classOf[Avg],
+    "count" -> classOf[Count],
+    "max" -> classOf[Max],
+    "min" -> classOf[Min],
+    "sum" -> classOf[Sum],
+
+    // string functions
+    "charLength" -> classOf[CharLength],
+    "initCap" -> classOf[InitCap],
+    "like" -> classOf[Like],
+    "lowerCase" -> classOf[Lower],
+    "similar" -> classOf[Similar],
+    "substring" -> classOf[Substring],
+    "trim" -> classOf[Trim],
+    "upperCase" -> classOf[Upper],
+    "position" -> classOf[Position],
+    "overlay" -> classOf[Overlay],
+
+    // math functions
+    "abs" -> classOf[Abs],
+    "ceil" -> classOf[Ceil],
+    "exp" -> classOf[Exp],
+    "floor" -> classOf[Floor],
+    "log10" -> classOf[Log10],
+    "ln" -> classOf[Ln],
+    "power" -> classOf[Power],
+    "mod" -> classOf[Mod],
+    "sqrt" -> classOf[Sqrt],
+
+    // temporal functions
+    "extract" -> classOf[Extract],
+    "currentDate" -> classOf[CurrentDate],
+    "currentTime" -> classOf[CurrentTime],
+    "currentTimestamp" -> classOf[CurrentTimestamp],
+    "localTime" -> classOf[LocalTime],
+    "localTimestamp" -> classOf[LocalTimestamp],
+    "quarter" -> classOf[Quarter],
+    "temporalOverlaps" -> classOf[TemporalOverlaps],
+
+    // array
+    "cardinality" -> classOf[ArrayCardinality],
+    "at" -> classOf[ArrayElementAt],
+    "element" -> classOf[ArrayElement]
+
+    // TODO implement function overloading here
+    // "floor" -> classOf[TemporalFloor]
+    // "ceil" -> classOf[TemporalCeil]
+  )
+
+  /**
+    * Create a new function catalog with built-in functions.
+    */
+  def withBuiltIns: FunctionCatalog = {
+    val catalog = new FunctionCatalog()
+    builtInFunctions.foreach { case (n, c) => catalog.registerFunction(n, c) }
+    catalog
+  }
+}
+
+class BasicOperatorTable extends ReflectiveSqlOperatorTable {
+
+  /**
+    * List of supported SQL operators / functions.
+    *
+    * This list should be kept in sync with [[SqlStdOperatorTable]].
+    */
+  private val builtInSqlOperators: Seq[SqlOperator] = Seq(
+    // SET OPERATORS
+    SqlStdOperatorTable.UNION,
+    SqlStdOperatorTable.UNION_ALL,
+    SqlStdOperatorTable.EXCEPT,
+    SqlStdOperatorTable.EXCEPT_ALL,
+    SqlStdOperatorTable.INTERSECT,
+    SqlStdOperatorTable.INTERSECT_ALL,
+    // BINARY OPERATORS
+    SqlStdOperatorTable.AND,
+    SqlStdOperatorTable.AS,
+    SqlStdOperatorTable.CONCAT,
+    SqlStdOperatorTable.DIVIDE,
+    SqlStdOperatorTable.DIVIDE_INTEGER,
+    SqlStdOperatorTable.DOT,
+    SqlStdOperatorTable.EQUALS,
+    SqlStdOperatorTable.GREATER_THAN,
+    SqlStdOperatorTable.IS_DISTINCT_FROM,
+    SqlStdOperatorTable.IS_NOT_DISTINCT_FROM,
+    SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
+    SqlStdOperatorTable.LESS_THAN,
+    SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
+    SqlStdOperatorTable.MINUS,
+    SqlStdOperatorTable.MULTIPLY,
+    SqlStdOperatorTable.NOT_EQUALS,
+    SqlStdOperatorTable.OR,
+    SqlStdOperatorTable.PLUS,
+    SqlStdOperatorTable.DATETIME_PLUS,
+    // POSTFIX OPERATORS
+    SqlStdOperatorTable.DESC,
+    SqlStdOperatorTable.NULLS_FIRST,
+    SqlStdOperatorTable.IS_NOT_NULL,
+    SqlStdOperatorTable.IS_NULL,
+    SqlStdOperatorTable.IS_NOT_TRUE,
+    SqlStdOperatorTable.IS_TRUE,
+    SqlStdOperatorTable.IS_NOT_FALSE,
+    SqlStdOperatorTable.IS_FALSE,
+    SqlStdOperatorTable.IS_NOT_UNKNOWN,
+    SqlStdOperatorTable.IS_UNKNOWN,
+    // PREFIX OPERATORS
+    SqlStdOperatorTable.NOT,
+    SqlStdOperatorTable.UNARY_MINUS,
+    SqlStdOperatorTable.UNARY_PLUS,
+    // AGGREGATE OPERATORS
+    SqlStdOperatorTable.SUM,
+    SqlStdOperatorTable.COUNT,
+    SqlStdOperatorTable.MIN,
+    SqlStdOperatorTable.MAX,
+    SqlStdOperatorTable.AVG,
+    // ARRAY OPERATORS
+    SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR,
+    SqlStdOperatorTable.ITEM,
+    SqlStdOperatorTable.CARDINALITY,
+    SqlStdOperatorTable.ELEMENT,
+    // SPECIAL OPERATORS
+    SqlStdOperatorTable.ROW,
+    SqlStdOperatorTable.OVERLAPS,
+    SqlStdOperatorTable.LITERAL_CHAIN,
+    SqlStdOperatorTable.BETWEEN,
+    SqlStdOperatorTable.SYMMETRIC_BETWEEN,
+    SqlStdOperatorTable.NOT_BETWEEN,
+    SqlStdOperatorTable.SYMMETRIC_NOT_BETWEEN,
+    SqlStdOperatorTable.NOT_LIKE,
+    SqlStdOperatorTable.LIKE,
+    SqlStdOperatorTable.NOT_SIMILAR_TO,
+    SqlStdOperatorTable.SIMILAR_TO,
+    SqlStdOperatorTable.CASE,
+    SqlStdOperatorTable.REINTERPRET,
+    SqlStdOperatorTable.EXTRACT_DATE,
+    // FUNCTIONS
+    SqlStdOperatorTable.SUBSTRING,
+    SqlStdOperatorTable.OVERLAY,
+    SqlStdOperatorTable.TRIM,
+    SqlStdOperatorTable.POSITION,
+    SqlStdOperatorTable.CHAR_LENGTH,
+    SqlStdOperatorTable.CHARACTER_LENGTH,
+    SqlStdOperatorTable.UPPER,
+    SqlStdOperatorTable.LOWER,
+    SqlStdOperatorTable.INITCAP,
+    SqlStdOperatorTable.POWER,
+    SqlStdOperatorTable.SQRT,
+    SqlStdOperatorTable.MOD,
+    SqlStdOperatorTable.LN,
+    SqlStdOperatorTable.LOG10,
+    SqlStdOperatorTable.ABS,
+    SqlStdOperatorTable.EXP,
+    SqlStdOperatorTable.NULLIF,
+    SqlStdOperatorTable.COALESCE,
+    SqlStdOperatorTable.FLOOR,
+    SqlStdOperatorTable.CEIL,
+    SqlStdOperatorTable.LOCALTIME,
+    SqlStdOperatorTable.LOCALTIMESTAMP,
+    SqlStdOperatorTable.CURRENT_TIME,
+    SqlStdOperatorTable.CURRENT_TIMESTAMP,
+    SqlStdOperatorTable.CURRENT_DATE,
+    SqlStdOperatorTable.CAST,
+    SqlStdOperatorTable.EXTRACT,
+    SqlStdOperatorTable.QUARTER,
+    SqlStdOperatorTable.SCALAR_QUERY,
+    SqlStdOperatorTable.EXISTS
+  )
+
+  builtInSqlOperators.foreach(register)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/ValidationResult.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/ValidationResult.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/ValidationResult.scala
new file mode 100644
index 0000000..64a568b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/ValidationResult.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.validate
+
+/**
+  * Represents the result of a validation.
+  */
+sealed trait ValidationResult {
+  def isFailure: Boolean = !isSuccess
+  def isSuccess: Boolean
+
+  /**
+    * Allows constructing a cascade of validation results.
+    * The first failure result will be returned.
+    */
+  def orElse(other: ValidationResult): ValidationResult = {
+    if (isSuccess) {
+      other
+    } else {
+      this
+    }
+  }
+}
+
+/**
+  * Represents the successful result of a validation.
+  */
+object ValidationSuccess extends ValidationResult {
+  val isSuccess: Boolean = true
+}
+
+/**
+  * Represents the failing result of a validation,
+  * with a error message to show the reason of failure.
+  */
+case class ValidationFailure(message: String) extends ValidationResult {
+  val isSuccess: Boolean = false
+}