You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:45 UTC
[16/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MinAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MinAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MinAggregate.scala
new file mode 100644
index 0000000..88cb058
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/MinAggregate.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.types.Row
+
+abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
+
+ protected var minIndex: Int = _
+
+ /**
+ * Initiate the intermediate aggregate value in Row.
+ *
+ * @param intermediate The intermediate aggregate row to initiate.
+ */
+ override def initiate(intermediate: Row): Unit = {
+ intermediate.setField(minIndex, null)
+ }
+
+ /**
+ * Accessed in MapFunction, prepare the input of partial aggregate.
+ *
+ * @param value
+ * @param partial
+ */
+ override def prepare(value: Any, partial: Row): Unit = {
+ if (value == null) {
+ initiate(partial)
+ } else {
+ partial.setField(minIndex, value)
+ }
+ }
+
+ /**
+ * Accessed in CombineFunction and GroupReduceFunction, merge partial
+ * aggregate result into aggregate buffer.
+ *
+ * @param partial
+ * @param buffer
+ */
+ override def merge(partial: Row, buffer: Row): Unit = {
+ val partialValue = partial.getField(minIndex).asInstanceOf[T]
+ if (partialValue != null) {
+ val bufferValue = buffer.getField(minIndex).asInstanceOf[T]
+ if (bufferValue != null) {
+ val min : T = if (ord.compare(partialValue, bufferValue) < 0) partialValue else bufferValue
+ buffer.setField(minIndex, min)
+ } else {
+ buffer.setField(minIndex, partialValue)
+ }
+ }
+ }
+
+ /**
+ * Return the final aggregated result based on aggregate buffer.
+ *
+ * @param buffer
+ * @return
+ */
+ override def evaluate(buffer: Row): T = {
+ buffer.getField(minIndex).asInstanceOf[T]
+ }
+
+ override def supportPartial: Boolean = true
+
+ override def setAggOffsetInRow(aggOffset: Int): Unit = {
+ minIndex = aggOffset
+ }
+}
+
+class ByteMinAggregate extends MinAggregate[Byte] {
+
+ override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
+
+}
+
+class ShortMinAggregate extends MinAggregate[Short] {
+
+ override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO)
+
+}
+
+class IntMinAggregate extends MinAggregate[Int] {
+
+ override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO)
+
+}
+
+class LongMinAggregate extends MinAggregate[Long] {
+
+ override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
+
+}
+
+class FloatMinAggregate extends MinAggregate[Float] {
+
+ override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO)
+
+}
+
+class DoubleMinAggregate extends MinAggregate[Double] {
+
+ override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO)
+
+}
+
+class BooleanMinAggregate extends MinAggregate[Boolean] {
+
+ override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO)
+
+}
+
+class DecimalMinAggregate extends Aggregate[BigDecimal] {
+
+ protected var minIndex: Int = _
+
+ override def intermediateDataType = Array(BasicTypeInfo.BIG_DEC_TYPE_INFO)
+
+ override def initiate(intermediate: Row): Unit = {
+ intermediate.setField(minIndex, null)
+ }
+
+ override def prepare(value: Any, partial: Row): Unit = {
+ if (value == null) {
+ initiate(partial)
+ } else {
+ partial.setField(minIndex, value)
+ }
+ }
+
+ override def merge(partial: Row, buffer: Row): Unit = {
+ val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal]
+ if (partialValue != null) {
+ val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal]
+ if (bufferValue != null) {
+ val min = if (partialValue.compareTo(bufferValue) < 0) partialValue else bufferValue
+ buffer.setField(minIndex, min)
+ } else {
+ buffer.setField(minIndex, partialValue)
+ }
+ }
+ }
+
+ override def evaluate(buffer: Row): BigDecimal = {
+ buffer.getField(minIndex).asInstanceOf[BigDecimal]
+ }
+
+ override def supportPartial: Boolean = true
+
+ override def setAggOffsetInRow(aggOffset: Int): Unit = {
+ minIndex = aggOffset
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SumAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SumAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SumAggregate.scala
new file mode 100644
index 0000000..cd88112
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SumAggregate.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.types.Row
+
+abstract class SumAggregate[T: Numeric]
+ extends Aggregate[T] {
+
+ private val numeric = implicitly[Numeric[T]]
+ protected var sumIndex: Int = _
+
+ override def initiate(partial: Row): Unit = {
+ partial.setField(sumIndex, null)
+ }
+
+ override def merge(partial1: Row, buffer: Row): Unit = {
+ val partialValue = partial1.getField(sumIndex).asInstanceOf[T]
+ if (partialValue != null) {
+ val bufferValue = buffer.getField(sumIndex).asInstanceOf[T]
+ if (bufferValue != null) {
+ buffer.setField(sumIndex, numeric.plus(partialValue, bufferValue))
+ } else {
+ buffer.setField(sumIndex, partialValue)
+ }
+ }
+ }
+
+ override def evaluate(buffer: Row): T = {
+ buffer.getField(sumIndex).asInstanceOf[T]
+ }
+
+ override def prepare(value: Any, partial: Row): Unit = {
+ if (value == null) {
+ initiate(partial)
+ } else {
+ val input = value.asInstanceOf[T]
+ partial.setField(sumIndex, input)
+ }
+ }
+
+ override def supportPartial: Boolean = true
+
+ override def setAggOffsetInRow(aggOffset: Int): Unit = {
+ sumIndex = aggOffset
+ }
+}
+
+class ByteSumAggregate extends SumAggregate[Byte] {
+ override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
+}
+
+class ShortSumAggregate extends SumAggregate[Short] {
+ override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO)
+}
+
+class IntSumAggregate extends SumAggregate[Int] {
+ override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO)
+}
+
+class LongSumAggregate extends SumAggregate[Long] {
+ override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
+}
+
+class FloatSumAggregate extends SumAggregate[Float] {
+ override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO)
+}
+
+class DoubleSumAggregate extends SumAggregate[Double] {
+ override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO)
+}
+
+class DecimalSumAggregate extends Aggregate[BigDecimal] {
+
+ protected var sumIndex: Int = _
+
+ override def intermediateDataType = Array(BasicTypeInfo.BIG_DEC_TYPE_INFO)
+
+ override def initiate(partial: Row): Unit = {
+ partial.setField(sumIndex, null)
+ }
+
+ override def merge(partial1: Row, buffer: Row): Unit = {
+ val partialValue = partial1.getField(sumIndex).asInstanceOf[BigDecimal]
+ if (partialValue != null) {
+ val bufferValue = buffer.getField(sumIndex).asInstanceOf[BigDecimal]
+ if (bufferValue != null) {
+ buffer.setField(sumIndex, partialValue.add(bufferValue))
+ } else {
+ buffer.setField(sumIndex, partialValue)
+ }
+ }
+ }
+
+ override def evaluate(buffer: Row): BigDecimal = {
+ buffer.getField(sumIndex).asInstanceOf[BigDecimal]
+ }
+
+ override def prepare(value: Any, partial: Row): Unit = {
+ if (value == null) {
+ initiate(partial)
+ } else {
+ val input = value.asInstanceOf[BigDecimal]
+ partial.setField(sumIndex, input)
+ }
+ }
+
+ override def supportPartial: Boolean = true
+
+ override def setAggOffsetInRow(aggOffset: Int): Unit = {
+ sumIndex = aggOffset
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
new file mode 100644
index 0000000..d2ac454
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+ * Adds TimeWindow properties to specified fields of a row before it emits the row to a wrapped
+ * collector.
+ */
+class TimeWindowPropertyCollector(windowStartOffset: Option[Int], windowEndOffset: Option[Int])
+ extends Collector[Row] {
+
+ var wrappedCollector: Collector[Row] = _
+ var timeWindow: TimeWindow = _
+
+ override def collect(record: Row): Unit = {
+
+ val lastFieldPos = record.getArity - 1
+
+ if (windowStartOffset.isDefined) {
+ record.setField(
+ lastFieldPos + windowStartOffset.get,
+ SqlFunctions.internalToTimestamp(timeWindow.getStart))
+ }
+ if (windowEndOffset.isDefined) {
+ record.setField(
+ lastFieldPos + windowEndOffset.get,
+ SqlFunctions.internalToTimestamp(timeWindow.getEnd))
+ }
+ wrappedCollector.collect(record)
+ }
+
+ override def close(): Unit = wrappedCollector.close()
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
new file mode 100644
index 0000000..1a339e6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io
+
+import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.core.io.GenericInputSplit
+import org.slf4j.LoggerFactory
+
+class ValuesInputFormat[OUT](
+ name: String,
+ code: String,
+ @transient returnType: TypeInformation[OUT])
+ extends GenericInputFormat[OUT]
+ with NonParallelInput
+ with ResultTypeQueryable[OUT]
+ with Compiler[GenericInputFormat[OUT]] {
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+
+ private var format: GenericInputFormat[OUT] = _
+
+ override def open(split: GenericInputSplit): Unit = {
+ LOG.debug(s"Compiling GenericInputFormat: $name \n\n Code:\n$code")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+ LOG.debug("Instantiating GenericInputFormat.")
+ format = clazz.newInstance()
+ }
+
+ override def reachedEnd(): Boolean = format.reachedEnd()
+
+ override def nextRecord(reuse: OUT): OUT = format.nextRecord(reuse)
+
+ override def getProducedType: TypeInformation[OUT] = returnType
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala
new file mode 100644
index 0000000..edfe113
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.Table
+
+/** Defines an external [[TableSink]] to emit a batch [[Table]].
+ *
+ * @tparam T Type of [[DataSet]] that this [[TableSink]] expects and supports.
+ */
+trait BatchTableSink[T] extends TableSink[T] {
+
+ /** Emits the DataSet. */
+ def emitDataSet(dataSet: DataSet[T]): Unit
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
new file mode 100644
index 0000000..9cf76dd
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.types.Row
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+
+/**
+ * A simple [[TableSink]] to emit data as CSV files.
+ *
+ * @param path The output path to write the Table to.
+ * @param fieldDelim The field delimiter, ',' by default.
+ */
+class CsvTableSink(
+ path: String,
+ fieldDelim: String = ",")
+ extends TableSinkBase[Row] with BatchTableSink[Row] with StreamTableSink[Row] {
+
+ override def emitDataSet(dataSet: DataSet[Row]): Unit = {
+ dataSet
+ .map(new CsvFormatter(fieldDelim))
+ .writeAsText(path)
+ }
+
+ override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+ dataStream
+ .map(new CsvFormatter(fieldDelim))
+ .writeAsText(path)
+ }
+
+ override protected def copy: TableSinkBase[Row] = {
+ new CsvTableSink(path, fieldDelim)
+ }
+
+ override def getOutputType: TypeInformation[Row] = {
+ new RowTypeInfo(getFieldTypes: _*)
+ }
+}
+
+/**
+ * Formats a [[Row]] into a [[String]] with fields separated by the field delimiter.
+ *
+ * @param fieldDelim The field delimiter.
+ */
+class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
+ override def map(row: Row): String = {
+
+ val builder = new StringBuilder
+
+ // write first value
+ val v = row.getField(0)
+ if (v != null) {
+ builder.append(v.toString)
+ }
+
+ // write following values
+ for (i <- 1 until row.getArity) {
+ builder.append(fieldDelim)
+ val v = row.getField(i)
+ if (v != null) {
+ builder.append(v.toString)
+ }
+ }
+ builder.mkString
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala
new file mode 100644
index 0000000..360252e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.Table
+
+/** Defines an external [[TableSink]] to emit a batch [[Table]].
+ *
+ * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and supports.
+ */
+trait StreamTableSink[T] extends TableSink[T] {
+
+ /** Emits the DataStream. */
+ def emitDataStream(dataStream: DataStream[T]): Unit
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/TableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/TableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/TableSink.scala
new file mode 100644
index 0000000..8304867
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/TableSink.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.Table
+
+/** A [[TableSink]] specifies how to emit a [[Table]] to an external
+ * system or location.
+ *
+ * The interface is generic such that it can support different storage locations and formats.
+ *
+ * @tparam T The return type of the [[TableSink]].
+ */
+trait TableSink[T] {
+
+ /**
+ * Return the type expected by this [[TableSink]].
+ *
+ * This type should depend on the types returned by [[getFieldNames]].
+ *
+ * @return The type expected by this [[TableSink]].
+ */
+ def getOutputType: TypeInformation[T]
+
+ /** Returns the names of the table fields. */
+ def getFieldNames: Array[String]
+
+ /** Returns the types of the table fields. */
+ def getFieldTypes: Array[TypeInformation[_]]
+
+ /**
+ * Return a copy of this [[TableSink]] configured with the field names and types of the
+ * [[Table]] to emit.
+ *
+ * @param fieldNames The field names of the table to emit.
+ * @param fieldTypes The field types of the table to emit.
+ * @return A copy of this [[TableSink]] configured with the field names and types of the
+ * [[Table]] to emit.
+ */
+ def configure(fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]]): TableSink[T]
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/TableSinkBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/TableSinkBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/TableSinkBase.scala
new file mode 100644
index 0000000..45866ca
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/TableSinkBase.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.sinks
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.Table
+
+trait TableSinkBase[T] extends TableSink[T] {
+
+ private var fieldNames: Option[Array[String]] = None
+ private var fieldTypes: Option[Array[TypeInformation[_]]] = None
+
+ /** Return a deep copy of the [[TableSink]]. */
+ protected def copy: TableSinkBase[T]
+
+ /**
+ * Return the field names of the [[Table]] to emit. */
+ def getFieldNames: Array[String] = {
+ fieldNames match {
+ case Some(n) => n
+ case None => throw new IllegalStateException(
+ "TableSink must be configured to retrieve field names.")
+ }
+ }
+
+ /** Return the field types of the [[Table]] to emit. */
+ def getFieldTypes: Array[TypeInformation[_]] = {
+ fieldTypes match {
+ case Some(t) => t
+ case None => throw new IllegalStateException(
+ "TableSink must be configured to retrieve field types.")
+ }
+ }
+
+ /**
+ * Return a copy of this [[TableSink]] configured with the field names and types of the
+ * [[Table]] to emit.
+ *
+ * @param fieldNames The field names of the table to emit.
+ * @param fieldTypes The field types of the table to emit.
+ * @return A copy of this [[TableSink]] configured with the field names and types of the
+ * [[Table]] to emit.
+ */
+ final def configure(fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]]): TableSink[T] = {
+
+ val configuredSink = this.copy
+ configuredSink.fieldNames = Some(fieldNames)
+ configuredSink.fieldTypes = Some(fieldTypes)
+
+ configuredSink
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala
new file mode 100644
index 0000000..0478dc9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
+
+/** Defines an external batch table and provides access to its data.
+ *
+ * @tparam T Type of the [[DataSet]] created by this [[TableSource]].
+ */
+trait BatchTableSource[T] extends TableSource[T] {
+
+ /**
+ * Returns the data of the table as a [[DataSet]].
+ *
+ * NOTE: This method is for internal use only for defining a [[TableSource]].
+ * Do not use it in Table API programs.
+ */
+ def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
new file mode 100644
index 0000000..3f4e395
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.io.CsvInputFormat
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.types.Row
+import org.apache.flink.api.java.io.RowCsvInputFormat
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.core.fs.Path
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableException
+
+/**
+ * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
+ * (logically) unlimited number of fields.
+ *
+ * @param path The path to the CSV file.
+ * @param fieldNames The names of the table fields.
+ * @param fieldTypes The types of the table fields.
+ * @param fieldDelim The field delimiter, "," by default.
+ * @param rowDelim The row delimiter, "\n" by default.
+ * @param quoteCharacter An optional quote character for String values, null by default.
+ * @param ignoreFirstLine Flag to ignore the first line, false by default.
+ * @param ignoreComments An optional prefix to indicate comments, null by default.
+ * @param lenient Flag to skip records with parse error instead to fail, false by default.
+ */
+class CsvTableSource(
+ path: String,
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]],
+ fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
+ rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
+ quoteCharacter: Character = null,
+ ignoreFirstLine: Boolean = false,
+ ignoreComments: String = null,
+ lenient: Boolean = false)
+ extends BatchTableSource[Row]
+ with StreamTableSource[Row] {
+
+ /**
+ * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
+ * (logically) unlimited number of fields.
+ *
+ * @param path The path to the CSV file.
+ * @param fieldNames The names of the table fields.
+ * @param fieldTypes The types of the table fields.
+ */
+ def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) =
+ this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
+ CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
+
+ if (fieldNames.length != fieldTypes.length) {
+ throw TableException("Number of field names and field types must be equal.")
+ }
+
+ private val returnType = new RowTypeInfo(fieldTypes: _*)
+
+ /**
+ * Returns the data of the table as a [[DataSet]] of [[Row]].
+ *
+ * NOTE: This method is for internal use only for defining a [[TableSource]].
+ * Do not use it in Table API programs.
+ */
+ override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
+ execEnv.createInput(createCsvInput(), returnType)
+ }
+
+ /** Returns the types of the table fields. */
+ override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
+
+ /** Returns the names of the table fields. */
+ override def getFieldsNames: Array[String] = fieldNames
+
+ /** Returns the number of fields of the table. */
+ override def getNumberOfFields: Int = fieldNames.length
+
+ /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */
+ override def getReturnType: RowTypeInfo = returnType
+
+ /**
+ * Returns the data of the table as a [[DataStream]] of [[Row]].
+ *
+ * NOTE: This method is for internal use only for defining a [[TableSource]].
+ * Do not use it in Table API programs.
+ */
+ override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = {
+ streamExecEnv.createInput(createCsvInput(), returnType)
+ }
+
+ private def createCsvInput(): RowCsvInputFormat = {
+ val inputFormat = new RowCsvInputFormat(new Path(path), returnType, rowDelim, fieldDelim)
+
+ inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
+ inputFormat.setLenient(lenient)
+ if (quoteCharacter != null) {
+ inputFormat.enableQuotedStringParsing(quoteCharacter)
+ }
+ if (ignoreComments != null) {
+ inputFormat.setCommentPrefix(ignoreComments)
+ }
+
+ inputFormat
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala
new file mode 100644
index 0000000..429cccb
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+/**
+ * Adds support for projection push-down to a [[TableSource]].
+ * A [[TableSource]] extending this interface is able to project the fields of the return table.
+ *
+ * @tparam T The return type of the [[ProjectableTableSource]].
+ */
+trait ProjectableTableSource[T] {
+
+ /**
+ * Creates a copy of the [[ProjectableTableSource]] that projects its output on the specified
+ * fields.
+ *
+ * @param fields The indexes of the fields to return.
+ * @return A copy of the [[ProjectableTableSource]] that projects its output.
+ */
+ def projectFields(fields: Array[Int]): ProjectableTableSource[T]
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala
new file mode 100644
index 0000000..7a2737c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+
+/** Defines an external stream table and provides access to its data.
+ *
+ * @tparam T Type of the [[DataStream]] created by this [[TableSource]].
+ */
+trait StreamTableSource[T] extends TableSource[T] {
+
+ /**
+ * Returns the data of the table as a [[DataStream]].
+ *
+ * NOTE: This method is for internal use only for defining a [[TableSource]].
+ * Do not use it in Table API programs.
+ */
+ def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
new file mode 100644
index 0000000..9d4ba68
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+/** Defines an external table by providing schema information, i.e., field names and types.
+ *
+ * @tparam T The return type of the [[TableSource]].
+ */
+trait TableSource[T] {
+
+ /** Returns the number of fields of the table. */
+ def getNumberOfFields: Int
+
+ /** Returns the names of the table fields. */
+ def getFieldsNames: Array[String]
+
+ /** Returns the types of the table fields. */
+ def getFieldTypes: Array[TypeInformation[_]]
+
+ /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */
+ def getReturnType: TypeInformation[T]
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/InternalTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/InternalTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/InternalTypeInfo.scala
new file mode 100644
index 0000000..b4af152
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/InternalTypeInfo.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import java.util.Objects
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+import org.apache.flink.util.Preconditions._
+
+/**
+ * TypeInformation for internal types of the Table API that are for translation purposes only
+ * and should not be contained in final plan.
+ */
+@SerialVersionUID(-13064574364925255L)
+abstract class InternalTypeInfo[T](val clazz: Class[T])
+ extends TypeInformation[T]
+ with AtomicType[T] {
+
+ checkNotNull(clazz)
+
+ override def isBasicType: Boolean =
+ throw new UnsupportedOperationException("This type is for internal use only.")
+
+ override def isTupleType: Boolean =
+ throw new UnsupportedOperationException("This type is for internal use only.")
+
+ override def getArity: Int =
+ throw new UnsupportedOperationException("This type is for internal use only.")
+
+ override def getTotalFields: Int =
+ throw new UnsupportedOperationException("This type is for internal use only.")
+
+ override def getTypeClass: Class[T] = clazz
+
+ override def isKeyType: Boolean =
+ throw new UnsupportedOperationException("This type is for internal use only.")
+
+ override def createSerializer(config: ExecutionConfig): TypeSerializer[T] =
+ throw new UnsupportedOperationException("This type is for internal use only.")
+
+ override def createComparator(
+ sortOrderAscending: Boolean,
+ executionConfig: ExecutionConfig)
+ : TypeComparator[T] =
+ throw new UnsupportedOperationException("This type is for internal use only.")
+
+ // ----------------------------------------------------------------------------------------------
+
+ override def hashCode: Int = Objects.hash(clazz)
+
+ def canEqual(obj: Any): Boolean
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case other: InternalTypeInfo[_] =>
+ other.canEqual(this) && (this.clazz eq other.clazz)
+ case _ =>
+ false
+ }
+ }
+
+ override def toString: String = s"InternalTypeInfo"
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/RowIntervalTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/RowIntervalTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/RowIntervalTypeInfo.scala
new file mode 100644
index 0000000..bbc20aa
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/RowIntervalTypeInfo.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+/**
+ * TypeInformation for row intervals.
+ */
+@SerialVersionUID(-1306179424364925258L)
+class RowIntervalTypeInfo extends InternalTypeInfo[Long](classOf[Long]) {
+
+ def canEqual(obj: Any): Boolean = obj.isInstanceOf[RowIntervalTypeInfo]
+
+ override def toString: String = s"RowIntervalTypeInfo"
+}
+
+object RowIntervalTypeInfo {
+
+ val INTERVAL_ROWS = new RowIntervalTypeInfo()
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIntervalTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIntervalTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIntervalTypeInfo.scala
new file mode 100644
index 0000000..9d76050
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIntervalTypeInfo.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import java.util.Objects
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.base.{IntComparator, IntSerializer, LongComparator, LongSerializer}
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo.instantiateComparator
+import org.apache.flink.util.Preconditions._
+
+/**
+ * TypeInformation for SQL INTERVAL types.
+ */
+@SerialVersionUID(-1816179424364825258L)
+class TimeIntervalTypeInfo[T](
+ val clazz: Class[T],
+ val serializer: TypeSerializer[T],
+ val comparatorClass: Class[_ <: TypeComparator[T]])
+ extends TypeInformation[T]
+ with AtomicType[T] {
+
+ checkNotNull(clazz)
+ checkNotNull(serializer)
+ checkNotNull(comparatorClass)
+
+ override def isBasicType: Boolean = false
+
+ override def isTupleType: Boolean = false
+
+ override def getArity: Int = 1
+
+ override def getTotalFields: Int = 1
+
+ override def getTypeClass: Class[T] = clazz
+
+ override def isKeyType: Boolean = true
+
+ override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = serializer
+
+ override def createComparator(
+ sortOrderAscending: Boolean,
+ executionConfig: ExecutionConfig)
+ : TypeComparator[T] = instantiateComparator(comparatorClass, sortOrderAscending)
+
+ // ----------------------------------------------------------------------------------------------
+
+ override def hashCode: Int = Objects.hash(clazz, serializer, comparatorClass)
+
+ def canEqual(obj: Any): Boolean = obj.isInstanceOf[TimeIntervalTypeInfo[_]]
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case other: TimeIntervalTypeInfo[_] =>
+ other.canEqual(this) &&
+ (this.clazz eq other.clazz) &&
+ serializer == other.serializer &&
+ (this.comparatorClass eq other.comparatorClass)
+ case _ =>
+ false
+ }
+ }
+
+ override def toString: String = s"TimeIntervalTypeInfo(${clazz.getSimpleName})"
+}
+
+object TimeIntervalTypeInfo {
+
+ val INTERVAL_MONTHS = new TimeIntervalTypeInfo(
+ classOf[java.lang.Integer],
+ IntSerializer.INSTANCE,
+ classOf[IntComparator])
+
+ val INTERVAL_MILLIS = new TimeIntervalTypeInfo(
+ classOf[java.lang.Long],
+ LongSerializer.INSTANCE,
+ classOf[LongComparator])
+
+ // ----------------------------------------------------------------------------------------------
+
+ private def instantiateComparator[X](
+ comparatorClass: Class[_ <: TypeComparator[X]],
+ ascendingOrder: java.lang.Boolean)
+ : TypeComparator[X] = {
+ try {
+ val constructor = comparatorClass.getConstructor(java.lang.Boolean.TYPE)
+ constructor.newInstance(ascendingOrder)
+ } catch {
+ case e: Exception =>
+ throw new RuntimeException(
+ s"Could not initialize comparator ${comparatorClass.getName}", e)
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
new file mode 100644
index 0000000..40f0cf2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.typeutils
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{BIG_DEC_TYPE_INFO, BOOLEAN_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO}
+import org.apache.flink.api.common.typeinfo._
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.table.validate._
+
+object TypeCheckUtils {
+
+ /**
+ * Checks if type information is an advanced type that can be converted to a
+ * SQL type but NOT vice versa.
+ */
+ def isAdvanced(dataType: TypeInformation[_]): Boolean = dataType match {
+ case _: BasicTypeInfo[_] => false
+ case _: SqlTimeTypeInfo[_] => false
+ case _: TimeIntervalTypeInfo[_] => false
+ case _ => true
+ }
+
+ /**
+ * Checks if type information is a simple type that can be converted to a
+ * SQL type and vice versa.
+ */
+ def isSimple(dataType: TypeInformation[_]): Boolean = !isAdvanced(dataType)
+
+ def isNumeric(dataType: TypeInformation[_]): Boolean = dataType match {
+ case _: NumericTypeInfo[_] => true
+ case BIG_DEC_TYPE_INFO => true
+ case _ => false
+ }
+
+ def isTemporal(dataType: TypeInformation[_]): Boolean =
+ isTimePoint(dataType) || isTimeInterval(dataType)
+
+ def isTimePoint(dataType: TypeInformation[_]): Boolean =
+ dataType.isInstanceOf[SqlTimeTypeInfo[_]]
+
+ def isTimeInterval(dataType: TypeInformation[_]): Boolean =
+ dataType.isInstanceOf[TimeIntervalTypeInfo[_]]
+
+ def isString(dataType: TypeInformation[_]): Boolean = dataType == STRING_TYPE_INFO
+
+ def isBoolean(dataType: TypeInformation[_]): Boolean = dataType == BOOLEAN_TYPE_INFO
+
+ def isDecimal(dataType: TypeInformation[_]): Boolean = dataType == BIG_DEC_TYPE_INFO
+
+ def isInteger(dataType: TypeInformation[_]): Boolean = dataType == INT_TYPE_INFO
+
+ def isArray(dataType: TypeInformation[_]): Boolean = dataType match {
+ case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => true
+ case _ => false
+ }
+
+ def isComparable(dataType: TypeInformation[_]): Boolean =
+ classOf[Comparable[_]].isAssignableFrom(dataType.getTypeClass) && !isArray(dataType)
+
+ def assertNumericExpr(
+ dataType: TypeInformation[_],
+ caller: String)
+ : ValidationResult = dataType match {
+ case _: NumericTypeInfo[_] =>
+ ValidationSuccess
+ case BIG_DEC_TYPE_INFO =>
+ ValidationSuccess
+ case _ =>
+ ValidationFailure(s"$caller requires numeric types, get $dataType here")
+ }
+
+ def assertOrderableExpr(dataType: TypeInformation[_], caller: String): ValidationResult = {
+ if (dataType.isSortKeyType) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"$caller requires orderable types, get $dataType here")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala
new file mode 100644
index 0000000..99e94b3
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, NumericTypeInfo, TypeInformation}
+
+/**
+ * Utilities for type conversions.
+ */
+object TypeCoercion {
+
+ val numericWideningPrecedence: IndexedSeq[TypeInformation[_]] =
+ IndexedSeq(
+ BYTE_TYPE_INFO,
+ SHORT_TYPE_INFO,
+ INT_TYPE_INFO,
+ LONG_TYPE_INFO,
+ FLOAT_TYPE_INFO,
+ DOUBLE_TYPE_INFO)
+
+ def widerTypeOf(tp1: TypeInformation[_], tp2: TypeInformation[_]): Option[TypeInformation[_]] = {
+ (tp1, tp2) match {
+ case (ti1, ti2) if ti1 == ti2 => Some(ti1)
+
+ case (_, STRING_TYPE_INFO) => Some(STRING_TYPE_INFO)
+ case (STRING_TYPE_INFO, _) => Some(STRING_TYPE_INFO)
+
+ case (_, BIG_DEC_TYPE_INFO) => Some(BIG_DEC_TYPE_INFO)
+ case (BIG_DEC_TYPE_INFO, _) => Some(BIG_DEC_TYPE_INFO)
+
+ case (stti: SqlTimeTypeInfo[_], _: TimeIntervalTypeInfo[_]) => Some(stti)
+ case (_: TimeIntervalTypeInfo[_], stti: SqlTimeTypeInfo[_]) => Some(stti)
+
+ case tuple if tuple.productIterator.forall(numericWideningPrecedence.contains) =>
+ val higherIndex = numericWideningPrecedence.lastIndexWhere(t => t == tp1 || t == tp2)
+ Some(numericWideningPrecedence(higherIndex))
+
+ case _ => None
+ }
+ }
+
+ /**
+ * Test if we can do cast safely without lose of information.
+ */
+ def canSafelyCast(from: TypeInformation[_], to: TypeInformation[_]): Boolean = (from, to) match {
+ case (_, STRING_TYPE_INFO) => true
+
+ case (_: NumericTypeInfo[_], BIG_DEC_TYPE_INFO) => true
+
+ case tuple if tuple.productIterator.forall(numericWideningPrecedence.contains) =>
+ if (numericWideningPrecedence.indexOf(from) < numericWideningPrecedence.indexOf(to)) {
+ true
+ } else {
+ false
+ }
+
+ case _ => false
+ }
+
+ /**
+ * All the supported cast types in flink-table.
+ * Note: This may lose information during the cast.
+ */
+ def canCast(from: TypeInformation[_], to: TypeInformation[_]): Boolean = (from, to) match {
+ case (fromTp, toTp) if fromTp == toTp => true
+
+ case (_, STRING_TYPE_INFO) => true
+
+ case (_, CHAR_TYPE_INFO) => false // Character type not supported.
+
+ case (STRING_TYPE_INFO, _: NumericTypeInfo[_]) => true
+ case (STRING_TYPE_INFO, BOOLEAN_TYPE_INFO) => true
+ case (STRING_TYPE_INFO, BIG_DEC_TYPE_INFO) => true
+ case (STRING_TYPE_INFO, SqlTimeTypeInfo.DATE) => true
+ case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIME) => true
+ case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) => true
+
+ case (BOOLEAN_TYPE_INFO, _: NumericTypeInfo[_]) => true
+ case (BOOLEAN_TYPE_INFO, BIG_DEC_TYPE_INFO) => true
+ case (_: NumericTypeInfo[_], BOOLEAN_TYPE_INFO) => true
+ case (BIG_DEC_TYPE_INFO, BOOLEAN_TYPE_INFO) => true
+
+ case (_: NumericTypeInfo[_], _: NumericTypeInfo[_]) => true
+ case (BIG_DEC_TYPE_INFO, _: NumericTypeInfo[_]) => true
+ case (_: NumericTypeInfo[_], BIG_DEC_TYPE_INFO) => true
+ case (INT_TYPE_INFO, SqlTimeTypeInfo.DATE) => true
+ case (INT_TYPE_INFO, SqlTimeTypeInfo.TIME) => true
+ case (LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) => true
+ case (INT_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MONTHS) => true
+ case (LONG_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MILLIS) => true
+
+ case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIME) => false
+ case (SqlTimeTypeInfo.TIME, SqlTimeTypeInfo.DATE) => false
+ case (_: SqlTimeTypeInfo[_], _: SqlTimeTypeInfo[_]) => true
+ case (SqlTimeTypeInfo.DATE, INT_TYPE_INFO) => true
+ case (SqlTimeTypeInfo.TIME, INT_TYPE_INFO) => true
+ case (SqlTimeTypeInfo.TIMESTAMP, LONG_TYPE_INFO) => true
+
+ case (TimeIntervalTypeInfo.INTERVAL_MONTHS, INT_TYPE_INFO) => true
+ case (TimeIntervalTypeInfo.INTERVAL_MILLIS, LONG_TYPE_INFO) => true
+
+ case _ => false
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala
new file mode 100644
index 0000000..a2a120b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.core.JoinRelType._
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+
+import scala.collection.JavaConversions._
+
+object TypeConverter {
+
+ val DEFAULT_ROW_TYPE = new RowTypeInfo().asInstanceOf[TypeInformation[Any]]
+
+ /**
+ * Determines the return type of Flink operators based on the logical fields, the expected
+ * physical type and configuration parameters.
+ *
+ * For example:
+ * - No physical type expected, only 3 non-null fields and efficient type usage enabled
+ * -> return Tuple3
+ * - No physical type expected, efficient type usage enabled, but 3 nullable fields
+ * -> return Row because Tuple does not support null values
+ * - Physical type expected
+ * -> check if physical type is compatible and return it
+ *
+ * @param logicalRowType logical row information
+ * @param expectedPhysicalType expected physical type
+ * @param nullable fields can be nullable
+ * @param useEfficientTypes use the most efficient types (e.g. Tuples and value types)
+ * @return suitable return type
+ */
+ def determineReturnType(
+ logicalRowType: RelDataType,
+ expectedPhysicalType: Option[TypeInformation[Any]],
+ nullable: Boolean,
+ useEfficientTypes: Boolean)
+ : TypeInformation[Any] = {
+ // convert to type information
+ val logicalFieldTypes = logicalRowType.getFieldList map { relDataType =>
+ FlinkTypeFactory.toTypeInfo(relDataType.getType)
+ }
+ // field names
+ val logicalFieldNames = logicalRowType.getFieldNames.toList
+
+ val returnType = expectedPhysicalType match {
+ // a certain physical type is expected (but not Row)
+ // check if expected physical type is compatible with logical field type
+ case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
+ if (typeInfo.getArity != logicalFieldTypes.length) {
+ throw new TableException("Arity of result does not match expected type.")
+ }
+ typeInfo match {
+
+ // POJO type expected
+ case pt: PojoTypeInfo[_] =>
+ logicalFieldNames.zip(logicalFieldTypes) foreach {
+ case (fName, fType) =>
+ val pojoIdx = pt.getFieldIndex(fName)
+ if (pojoIdx < 0) {
+ throw new TableException(s"POJO does not define field name: $fName")
+ }
+ val expectedTypeInfo = pt.getTypeAt(pojoIdx)
+ if (fType != expectedTypeInfo) {
+ throw new TableException(s"Result field does not match expected type. " +
+ s"Expected: $expectedTypeInfo; Actual: $fType")
+ }
+ }
+
+ // Tuple/Case class type expected
+ case ct: CompositeType[_] =>
+ logicalFieldTypes.zipWithIndex foreach {
+ case (fieldTypeInfo, i) =>
+ val expectedTypeInfo = ct.getTypeAt(i)
+ if (fieldTypeInfo != expectedTypeInfo) {
+ throw new TableException(s"Result field does not match expected type. " +
+ s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo")
+ }
+ }
+
+ // Atomic type expected
+ case at: AtomicType[_] =>
+ val fieldTypeInfo = logicalFieldTypes.head
+ if (fieldTypeInfo != at) {
+ throw new TableException(s"Result field does not match expected type. " +
+ s"Expected: $at; Actual: $fieldTypeInfo")
+ }
+
+ case _ =>
+ throw new TableException("Unsupported result type.")
+ }
+ typeInfo
+
+ // Row is expected, create the arity for it
+ case Some(typeInfo) if typeInfo.getTypeClass == classOf[Row] =>
+ new RowTypeInfo(logicalFieldTypes: _*)
+
+ // no physical type
+ // determine type based on logical fields and configuration parameters
+ case None =>
+ // no need for efficient types -> use Row
+ // we cannot use efficient types if row arity > tuple arity or nullable
+ if (!useEfficientTypes || logicalFieldTypes.length > Tuple.MAX_ARITY || nullable) {
+ new RowTypeInfo(logicalFieldTypes: _*)
+ }
+ // use efficient type tuple or atomic type
+ else {
+ if (logicalFieldTypes.length == 1) {
+ logicalFieldTypes.head
+ }
+ else {
+ new TupleTypeInfo[Tuple](logicalFieldTypes.toArray:_*)
+ }
+ }
+ }
+ returnType.asInstanceOf[TypeInformation[Any]]
+ }
+
+ def sqlJoinTypeToFlinkJoinType(sqlJoinType: JoinRelType): JoinType = sqlJoinType match {
+ case INNER => JoinType.INNER
+ case LEFT => JoinType.LEFT_OUTER
+ case RIGHT => JoinType.RIGHT_OUTER
+ case FULL => JoinType.FULL_OUTER
+ }
+
+ def flinkJoinTypeToRelType(joinType: JoinType) = joinType match {
+ case JoinType.INNER => JoinRelType.INNER
+ case JoinType.LEFT_OUTER => JoinRelType.LEFT
+ case JoinType.RIGHT_OUTER => JoinRelType.RIGHT
+ case JoinType.FULL_OUTER => JoinRelType.FULL
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
new file mode 100644
index 0000000..f92b3a1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.validate
+
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable, ReflectiveSqlOperatorTable}
+import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable}
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
+import org.apache.flink.table.functions.utils.{TableSqlFunction, UserDefinedFunctionUtils}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+/**
+ * A catalog for looking up (user-defined) functions, used during validation phases
+ * of both Table API and SQL API.
+ */
+class FunctionCatalog {
+
+ private val functionBuilders = mutable.HashMap.empty[String, Class[_]]
+ private val sqlFunctions = mutable.ListBuffer[SqlFunction]()
+
+ def registerFunction(name: String, builder: Class[_]): Unit =
+ functionBuilders.put(name.toLowerCase, builder)
+
+ def registerSqlFunction(sqlFunction: SqlFunction): Unit = {
+ sqlFunctions --= sqlFunctions.filter(_.getName == sqlFunction.getName)
+ sqlFunctions += sqlFunction
+ }
+
+ /**
+ * Register multiple SQL functions at the same time. The functions have the same name.
+ */
+ def registerSqlFunctions(functions: Seq[SqlFunction]): Unit = {
+ if (functions.nonEmpty) {
+ val name = functions.head.getName
+ // check that all functions have the same name
+ if (functions.forall(_.getName == name)) {
+ sqlFunctions --= sqlFunctions.filter(_.getName == name)
+ sqlFunctions ++= functions
+ } else {
+ throw ValidationException("The SQL functions to be registered have different names.")
+ }
+ }
+ }
+
+ def getSqlOperatorTable: SqlOperatorTable =
+ ChainedSqlOperatorTable.of(
+ new BasicOperatorTable(),
+ new ListSqlOperatorTable(sqlFunctions)
+ )
+
+ /**
+ * Lookup and create an expression if we find a match.
+ */
+ def lookupFunction(name: String, children: Seq[Expression]): Expression = {
+ val funcClass = functionBuilders
+ .getOrElse(name.toLowerCase, throw ValidationException(s"Undefined function: $name"))
+
+ // Instantiate a function using the provided `children`
+ funcClass match {
+
+ // user-defined scalar function call
+ case sf if classOf[ScalarFunction].isAssignableFrom(sf) =>
+ Try(UserDefinedFunctionUtils.instantiate(sf.asInstanceOf[Class[ScalarFunction]])) match {
+ case Success(scalarFunction) => ScalarFunctionCall(scalarFunction, children)
+ case Failure(e) => throw ValidationException(e.getMessage)
+ }
+
+ // user-defined table function call
+ case tf if classOf[TableFunction[_]].isAssignableFrom(tf) =>
+ val tableSqlFunction = sqlFunctions
+ .find(f => f.getName.equalsIgnoreCase(name) && f.isInstanceOf[TableSqlFunction])
+ .getOrElse(throw ValidationException(s"Undefined table function: $name"))
+ .asInstanceOf[TableSqlFunction]
+ val typeInfo = tableSqlFunction.getRowTypeInfo
+ val function = tableSqlFunction.getTableFunction
+ TableFunctionCall(name, function, children, typeInfo)
+
+ // general expression call
+ case expression if classOf[Expression].isAssignableFrom(expression) =>
+ // try to find a constructor accepts `Seq[Expression]`
+ Try(funcClass.getDeclaredConstructor(classOf[Seq[_]])) match {
+ case Success(seqCtor) =>
+ Try(seqCtor.newInstance(children).asInstanceOf[Expression]) match {
+ case Success(expr) => expr
+ case Failure(e) => throw new ValidationException(e.getMessage)
+ }
+ case Failure(e) =>
+ val childrenClass = Seq.fill(children.length)(classOf[Expression])
+ // try to find a constructor matching the exact number of children
+ Try(funcClass.getDeclaredConstructor(childrenClass: _*)) match {
+ case Success(ctor) =>
+ Try(ctor.newInstance(children: _*).asInstanceOf[Expression]) match {
+ case Success(expr) => expr
+ case Failure(exception) => throw ValidationException(exception.getMessage)
+ }
+ case Failure(exception) =>
+ throw ValidationException(s"Invalid number of arguments for function $funcClass")
+ }
+ }
+
+ case _ =>
+ throw ValidationException("Unsupported function.")
+ }
+ }
+
+ /**
+ * Drop a function and return if the function existed.
+ */
+ def dropFunction(name: String): Boolean =
+ functionBuilders.remove(name.toLowerCase).isDefined
+
+ /**
+ * Drop all registered functions.
+ */
+ def clear(): Unit = functionBuilders.clear()
+}
+
+object FunctionCatalog {
+
+ val builtInFunctions: Map[String, Class[_]] = Map(
+ // logic
+ "isNull" -> classOf[IsNull],
+ "isNotNull" -> classOf[IsNotNull],
+ "isTrue" -> classOf[IsTrue],
+ "isFalse" -> classOf[IsFalse],
+ "isNotTrue" -> classOf[IsNotTrue],
+ "isNotFalse" -> classOf[IsNotFalse],
+
+ // aggregate functions
+ "avg" -> classOf[Avg],
+ "count" -> classOf[Count],
+ "max" -> classOf[Max],
+ "min" -> classOf[Min],
+ "sum" -> classOf[Sum],
+
+ // string functions
+ "charLength" -> classOf[CharLength],
+ "initCap" -> classOf[InitCap],
+ "like" -> classOf[Like],
+ "lowerCase" -> classOf[Lower],
+ "similar" -> classOf[Similar],
+ "substring" -> classOf[Substring],
+ "trim" -> classOf[Trim],
+ "upperCase" -> classOf[Upper],
+ "position" -> classOf[Position],
+ "overlay" -> classOf[Overlay],
+
+ // math functions
+ "abs" -> classOf[Abs],
+ "ceil" -> classOf[Ceil],
+ "exp" -> classOf[Exp],
+ "floor" -> classOf[Floor],
+ "log10" -> classOf[Log10],
+ "ln" -> classOf[Ln],
+ "power" -> classOf[Power],
+ "mod" -> classOf[Mod],
+ "sqrt" -> classOf[Sqrt],
+
+ // temporal functions
+ "extract" -> classOf[Extract],
+ "currentDate" -> classOf[CurrentDate],
+ "currentTime" -> classOf[CurrentTime],
+ "currentTimestamp" -> classOf[CurrentTimestamp],
+ "localTime" -> classOf[LocalTime],
+ "localTimestamp" -> classOf[LocalTimestamp],
+ "quarter" -> classOf[Quarter],
+ "temporalOverlaps" -> classOf[TemporalOverlaps],
+
+ // array
+ "cardinality" -> classOf[ArrayCardinality],
+ "at" -> classOf[ArrayElementAt],
+ "element" -> classOf[ArrayElement]
+
+ // TODO implement function overloading here
+ // "floor" -> classOf[TemporalFloor]
+ // "ceil" -> classOf[TemporalCeil]
+ )
+
+ /**
+ * Create a new function catalog with built-in functions.
+ */
+ def withBuiltIns: FunctionCatalog = {
+ val catalog = new FunctionCatalog()
+ builtInFunctions.foreach { case (n, c) => catalog.registerFunction(n, c) }
+ catalog
+ }
+}
+
+class BasicOperatorTable extends ReflectiveSqlOperatorTable {
+
+ /**
+ * List of supported SQL operators / functions.
+ *
+ * This list should be kept in sync with [[SqlStdOperatorTable]].
+ */
+ private val builtInSqlOperators: Seq[SqlOperator] = Seq(
+ // SET OPERATORS
+ SqlStdOperatorTable.UNION,
+ SqlStdOperatorTable.UNION_ALL,
+ SqlStdOperatorTable.EXCEPT,
+ SqlStdOperatorTable.EXCEPT_ALL,
+ SqlStdOperatorTable.INTERSECT,
+ SqlStdOperatorTable.INTERSECT_ALL,
+ // BINARY OPERATORS
+ SqlStdOperatorTable.AND,
+ SqlStdOperatorTable.AS,
+ SqlStdOperatorTable.CONCAT,
+ SqlStdOperatorTable.DIVIDE,
+ SqlStdOperatorTable.DIVIDE_INTEGER,
+ SqlStdOperatorTable.DOT,
+ SqlStdOperatorTable.EQUALS,
+ SqlStdOperatorTable.GREATER_THAN,
+ SqlStdOperatorTable.IS_DISTINCT_FROM,
+ SqlStdOperatorTable.IS_NOT_DISTINCT_FROM,
+ SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
+ SqlStdOperatorTable.LESS_THAN,
+ SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
+ SqlStdOperatorTable.MINUS,
+ SqlStdOperatorTable.MULTIPLY,
+ SqlStdOperatorTable.NOT_EQUALS,
+ SqlStdOperatorTable.OR,
+ SqlStdOperatorTable.PLUS,
+ SqlStdOperatorTable.DATETIME_PLUS,
+ // POSTFIX OPERATORS
+ SqlStdOperatorTable.DESC,
+ SqlStdOperatorTable.NULLS_FIRST,
+ SqlStdOperatorTable.IS_NOT_NULL,
+ SqlStdOperatorTable.IS_NULL,
+ SqlStdOperatorTable.IS_NOT_TRUE,
+ SqlStdOperatorTable.IS_TRUE,
+ SqlStdOperatorTable.IS_NOT_FALSE,
+ SqlStdOperatorTable.IS_FALSE,
+ SqlStdOperatorTable.IS_NOT_UNKNOWN,
+ SqlStdOperatorTable.IS_UNKNOWN,
+ // PREFIX OPERATORS
+ SqlStdOperatorTable.NOT,
+ SqlStdOperatorTable.UNARY_MINUS,
+ SqlStdOperatorTable.UNARY_PLUS,
+ // AGGREGATE OPERATORS
+ SqlStdOperatorTable.SUM,
+ SqlStdOperatorTable.COUNT,
+ SqlStdOperatorTable.MIN,
+ SqlStdOperatorTable.MAX,
+ SqlStdOperatorTable.AVG,
+ // ARRAY OPERATORS
+ SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR,
+ SqlStdOperatorTable.ITEM,
+ SqlStdOperatorTable.CARDINALITY,
+ SqlStdOperatorTable.ELEMENT,
+ // SPECIAL OPERATORS
+ SqlStdOperatorTable.ROW,
+ SqlStdOperatorTable.OVERLAPS,
+ SqlStdOperatorTable.LITERAL_CHAIN,
+ SqlStdOperatorTable.BETWEEN,
+ SqlStdOperatorTable.SYMMETRIC_BETWEEN,
+ SqlStdOperatorTable.NOT_BETWEEN,
+ SqlStdOperatorTable.SYMMETRIC_NOT_BETWEEN,
+ SqlStdOperatorTable.NOT_LIKE,
+ SqlStdOperatorTable.LIKE,
+ SqlStdOperatorTable.NOT_SIMILAR_TO,
+ SqlStdOperatorTable.SIMILAR_TO,
+ SqlStdOperatorTable.CASE,
+ SqlStdOperatorTable.REINTERPRET,
+ SqlStdOperatorTable.EXTRACT_DATE,
+ // FUNCTIONS
+ SqlStdOperatorTable.SUBSTRING,
+ SqlStdOperatorTable.OVERLAY,
+ SqlStdOperatorTable.TRIM,
+ SqlStdOperatorTable.POSITION,
+ SqlStdOperatorTable.CHAR_LENGTH,
+ SqlStdOperatorTable.CHARACTER_LENGTH,
+ SqlStdOperatorTable.UPPER,
+ SqlStdOperatorTable.LOWER,
+ SqlStdOperatorTable.INITCAP,
+ SqlStdOperatorTable.POWER,
+ SqlStdOperatorTable.SQRT,
+ SqlStdOperatorTable.MOD,
+ SqlStdOperatorTable.LN,
+ SqlStdOperatorTable.LOG10,
+ SqlStdOperatorTable.ABS,
+ SqlStdOperatorTable.EXP,
+ SqlStdOperatorTable.NULLIF,
+ SqlStdOperatorTable.COALESCE,
+ SqlStdOperatorTable.FLOOR,
+ SqlStdOperatorTable.CEIL,
+ SqlStdOperatorTable.LOCALTIME,
+ SqlStdOperatorTable.LOCALTIMESTAMP,
+ SqlStdOperatorTable.CURRENT_TIME,
+ SqlStdOperatorTable.CURRENT_TIMESTAMP,
+ SqlStdOperatorTable.CURRENT_DATE,
+ SqlStdOperatorTable.CAST,
+ SqlStdOperatorTable.EXTRACT,
+ SqlStdOperatorTable.QUARTER,
+ SqlStdOperatorTable.SCALAR_QUERY,
+ SqlStdOperatorTable.EXISTS
+ )
+
+ builtInSqlOperators.foreach(register)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/ValidationResult.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/ValidationResult.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/ValidationResult.scala
new file mode 100644
index 0000000..64a568b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/ValidationResult.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.validate
+
+/**
+ * Represents the result of a validation.
+ */
+sealed trait ValidationResult {
+ def isFailure: Boolean = !isSuccess
+ def isSuccess: Boolean
+
+ /**
+ * Allows constructing a cascade of validation results.
+ * The first failure result will be returned.
+ */
+ def orElse(other: ValidationResult): ValidationResult = {
+ if (isSuccess) {
+ other
+ } else {
+ this
+ }
+ }
+}
+
+/**
+ * Represents the successful result of a validation.
+ */
+object ValidationSuccess extends ValidationResult {
+ val isSuccess: Boolean = true
+}
+
+/**
+ * Represents the failing result of a validation,
+ * with a error message to show the reason of failure.
+ */
+case class ValidationFailure(message: String) extends ValidationResult {
+ val isSuccess: Boolean = false
+}