You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:51 UTC

[22/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
new file mode 100644
index 0000000..b31367c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.typeutils.TypeCheckUtils
+import org.apache.flink.table.validate._
+
+case class Abs(child: Expression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = child.resultType
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeCheckUtils.assertNumericExpr(child.resultType, "Abs")
+
+  override def toString: String = s"abs($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.ABS, child.toRexNode)
+  }
+}
+
+case class Ceil(child: Expression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeCheckUtils.assertNumericExpr(child.resultType, "Ceil")
+
+  override def toString: String = s"ceil($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.CEIL, child.toRexNode)
+  }
+}
+
+case class Exp(child: Expression) extends UnaryExpression with InputTypeSpec {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
+
+  override def toString: String = s"exp($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.EXP, child.toRexNode)
+  }
+}
+
+
+case class Floor(child: Expression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult =
+    TypeCheckUtils.assertNumericExpr(child.resultType, "Floor")
+
+  override def toString: String = s"floor($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.FLOOR, child.toRexNode)
+  }
+}
+
+case class Log10(child: Expression) extends UnaryExpression with InputTypeSpec {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
+
+  override def toString: String = s"log10($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.LOG10, child.toRexNode)
+  }
+}
+
+case class Ln(child: Expression) extends UnaryExpression with InputTypeSpec {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
+
+  override def toString: String = s"ln($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.LN, child.toRexNode)
+  }
+}
+
+case class Power(left: Expression, right: Expression) extends BinaryExpression with InputTypeSpec {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    DOUBLE_TYPE_INFO :: DOUBLE_TYPE_INFO :: Nil
+
+  override def toString: String = s"pow($left, $right)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.POWER, left.toRexNode, right.toRexNode)
+  }
+}
+
+case class Sqrt(child: Expression) extends UnaryExpression with InputTypeSpec {
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(DOUBLE_TYPE_INFO)
+
+  override def toString: String = s"sqrt($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.POWER, child.toRexNode, Literal(0.5).toRexNode)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ordering.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ordering.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ordering.scala
new file mode 100644
index 0000000..7f03827
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ordering.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.validate._
+
+abstract class Ordering extends UnaryExpression {
+  override private[flink] def validateInput(): ValidationResult = {
+    if (!child.isInstanceOf[NamedExpression]) {
+      ValidationFailure(s"Sort should only based on field reference")
+    } else {
+      ValidationSuccess
+    }
+  }
+}
+
+case class Asc(child: Expression) extends Ordering {
+  override def toString: String = s"($child).asc"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    child.toRexNode
+  }
+
+  override private[flink] def resultType: TypeInformation[_] = child.resultType
+}
+
+case class Desc(child: Expression) extends Ordering {
+  override def toString: String = s"($child).desc"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.desc(child.toRexNode)
+  }
+
+  override private[flink] def resultType: TypeInformation[_] = child.resultType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/package.scala
new file mode 100644
index 0000000..41e0c9f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/package.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table
+
+/**
+ * This package contains the base class of AST nodes and all the expression language AST classes.
+ * Expression trees should not be manually constructed by users. They are implicitly constructed
+ * from the implicit DSL conversions in
+ * [[org.apache.flink.table.api.scala.ImplicitExpressionConversions]] and
+ * [[org.apache.flink.table.api.scala.ImplicitExpressionOperations]]. For the Java API,
+ * expression trees should be generated from a string parser that parses expressions and creates
+ * AST nodes.
+ */
+package object expressions

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
new file mode 100644
index 0000000..f4b58cc
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import scala.collection.JavaConversions._
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.expressions.TrimMode.TrimMode
+import org.apache.flink.table.validate._
+
+/**
+  * Returns the length of this `str`.
+  */
+case class CharLength(child: Expression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = INT_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (child.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"CharLength operator requires String input, " +
+        s"but $child is of type ${child.resultType}")
+    }
+  }
+
+  override def toString: String = s"($child).charLength()"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.CHAR_LENGTH, child.toRexNode)
+  }
+}
+
+/**
+  * Returns str with the first letter of each word in uppercase.
+  * All other letters are in lowercase. Words are delimited by white space.
+  */
+case class InitCap(child: Expression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (child.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"InitCap operator requires String input, " + 
+        s"but $child is of type ${child.resultType}")
+    }
+  }
+
+  override def toString: String = s"($child).initCap()"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.INITCAP, child.toRexNode)
+  }
+}
+
+/**
+  * Returns true if `str` matches `pattern`.
+  */
+case class Like(str: Expression, pattern: Expression) extends BinaryExpression {
+  private[flink] def left: Expression = str
+  private[flink] def right: Expression = pattern
+
+  override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (str.resultType == STRING_TYPE_INFO && pattern.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Like operator requires (String, String) input, " +
+        s"but ($str, $pattern) is of type (${str.resultType}, ${pattern.resultType})")
+    }
+  }
+
+  override def toString: String = s"($str).like($pattern)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.LIKE, children.map(_.toRexNode))
+  }
+}
+
+/**
+  * Returns str with all characters changed to lowercase.
+  */
+case class Lower(child: Expression) extends UnaryExpression {
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (child.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Lower operator requires String input, " +
+        s"but $child is of type ${child.resultType}")
+    }
+  }
+
+  override def toString: String = s"($child).toLowerCase()"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.LOWER, child.toRexNode)
+  }
+}
+
+/**
+  * Returns true if `str` is similar to `pattern`.
+  */
+case class Similar(str: Expression, pattern: Expression) extends BinaryExpression {
+  private[flink] def left: Expression = str
+  private[flink] def right: Expression = pattern
+
+  override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (str.resultType == STRING_TYPE_INFO && pattern.resultType == STRING_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Similar operator requires (String, String) input, " +
+        s"but ($str, $pattern) is of type (${str.resultType}, ${pattern.resultType})")
+    }
+  }
+
+  override def toString: String = s"($str).similarTo($pattern)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.SIMILAR_TO, children.map(_.toRexNode))
+  }
+}
+
+/**
+  * Returns substring of `str` from `begin`(inclusive) for `length`.
+  */
+case class Substring(
+    str: Expression,
+    begin: Expression,
+    length: Expression) extends Expression with InputTypeSpec {
+
+  def this(str: Expression, begin: Expression) = this(str, begin, CharLength(str))
+
+  override private[flink] def children: Seq[Expression] = str :: begin :: length :: Nil
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
+
+  override def toString: String = s"($str).substring($begin, $length)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.SUBSTRING, children.map(_.toRexNode))
+  }
+}
+
+/**
+  * Trim `trimString` from `str` according to `trimMode`.
+  */
+case class Trim(
+    trimMode: Expression,
+    trimString: Expression,
+    str: Expression) extends Expression {
+
+  override private[flink] def children: Seq[Expression] = trimMode :: trimString :: str :: Nil
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    trimMode match {
+      case SymbolExpression(_: TrimMode) =>
+        if (trimString.resultType != STRING_TYPE_INFO) {
+          ValidationFailure(s"String expected for trimString, get ${trimString.resultType}")
+        } else if (str.resultType != STRING_TYPE_INFO) {
+          ValidationFailure(s"String expected for str, get ${str.resultType}")
+        } else {
+          ValidationSuccess
+        }
+      case _ => ValidationFailure("TrimMode symbol expected.")
+    }
+  }
+
+  override def toString: String = s"($str).trim($trimMode, $trimString)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.TRIM, children.map(_.toRexNode))
+  }
+}
+
+/**
+  * Enumeration of trim flags.
+  */
+object TrimConstants {
+  val TRIM_DEFAULT_CHAR = Literal(" ")
+}
+
+/**
+  * Returns str with all characters changed to uppercase.
+  */
+case class Upper(child: Expression) extends UnaryExpression with InputTypeSpec {
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(STRING_TYPE_INFO)
+
+  override def toString: String = s"($child).upperCase()"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.UPPER, child.toRexNode)
+  }
+}
+
+/**
+  * Returns the position of string needle in string haystack.
+  */
+case class Position(needle: Expression, haystack: Expression)
+    extends Expression with InputTypeSpec {
+
+  override private[flink] def children: Seq[Expression] = Seq(needle, haystack)
+
+  override private[flink] def resultType: TypeInformation[_] = INT_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO)
+
+  override def toString: String = s"($needle).position($haystack)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.POSITION, needle.toRexNode, haystack.toRexNode)
+  }
+}
+
+/**
+  * Replaces a substring of a string with a replacement string.
+  * Starting at a position for a given length.
+  */
+case class Overlay(
+    str: Expression,
+    replacement: Expression,
+    starting: Expression,
+    position: Expression)
+  extends Expression with InputTypeSpec {
+
+  def this(str: Expression, replacement: Expression, starting: Expression) =
+    this(str, replacement, starting, CharLength(replacement))
+
+  override private[flink] def children: Seq[Expression] =
+    Seq(str, replacement, starting, position)
+
+  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
+
+  override def toString: String = s"($str).overlay($replacement, $starting, $position)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(
+      SqlStdOperatorTable.OVERLAY,
+      str.toRexNode,
+      replacement.toRexNode,
+      starting.toRexNode,
+      position.toRexNode)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala
new file mode 100644
index 0000000..0d71fb2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/symbols.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.avatica.util.{TimeUnit, TimeUnitRange}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlTrimFunction
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+import scala.language.{existentials, implicitConversions}
+
+/**
+  * General expression class to represent a symbol.
+  */
+case class SymbolExpression(symbol: TableSymbol) extends LeafExpression {
+
+  override private[flink] def resultType: TypeInformation[_] =
+    throw new UnsupportedOperationException("This should not happen. A symbol has no result type.")
+
+  def toExpr = this // triggers implicit conversion
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    // dirty hack to pass Java enums to Java from Scala
+    val enum = symbol.enum.asInstanceOf[Enum[T] forSome { type T <: Enum[T] }]
+    relBuilder.getRexBuilder.makeFlag(enum)
+  }
+
+  override def toString: String = s"${symbol.symbols}.${symbol.name}"
+
+}
+
+/**
+  * Symbol that wraps a Calcite symbol in form of a Java enum.
+  */
+trait TableSymbol {
+  def symbols: TableSymbols
+  def name: String
+  def enum: Enum[_]
+}
+
+/**
+  * Enumeration of symbols.
+  */
+abstract class TableSymbols extends Enumeration {
+
+  class TableSymbolValue(e: Enum[_]) extends Val(e.name()) with TableSymbol {
+    override def symbols: TableSymbols = TableSymbols.this
+
+    override def enum: Enum[_] = e
+
+    override def name: String = toString()
+  }
+
+  protected final def Value(enum: Enum[_]): TableSymbolValue = new TableSymbolValue(enum)
+
+  implicit def symbolToExpression(symbol: TableSymbolValue): SymbolExpression =
+    SymbolExpression(symbol)
+
+}
+
+/**
+  * Units for working with time intervals.
+  */
+object TimeIntervalUnit extends TableSymbols {
+
+  type TimeIntervalUnit = TableSymbolValue
+
+  val YEAR = Value(TimeUnitRange.YEAR)
+  val YEAR_TO_MONTH = Value(TimeUnitRange.YEAR_TO_MONTH)
+  val MONTH = Value(TimeUnitRange.MONTH)
+  val DAY = Value(TimeUnitRange.DAY)
+  val DAY_TO_HOUR = Value(TimeUnitRange.DAY_TO_HOUR)
+  val DAY_TO_MINUTE = Value(TimeUnitRange.DAY_TO_MINUTE)
+  val DAY_TO_SECOND = Value(TimeUnitRange.DAY_TO_SECOND)
+  val HOUR = Value(TimeUnitRange.HOUR)
+  val HOUR_TO_MINUTE = Value(TimeUnitRange.HOUR_TO_MINUTE)
+  val HOUR_TO_SECOND = Value(TimeUnitRange.HOUR_TO_SECOND)
+  val MINUTE = Value(TimeUnitRange.MINUTE)
+  val MINUTE_TO_SECOND = Value(TimeUnitRange.MINUTE_TO_SECOND)
+  val SECOND = Value(TimeUnitRange.SECOND)
+
+}
+
+/**
+  * Units for working with time points.
+  */
+object TimePointUnit extends TableSymbols {
+
+  type TimePointUnit = TableSymbolValue
+
+  val YEAR = Value(TimeUnit.YEAR)
+  val MONTH = Value(TimeUnit.MONTH)
+  val DAY = Value(TimeUnit.DAY)
+  val HOUR = Value(TimeUnit.HOUR)
+  val MINUTE = Value(TimeUnit.MINUTE)
+  val SECOND = Value(TimeUnit.SECOND)
+  val QUARTER = Value(TimeUnit.QUARTER)
+  val WEEK = Value(TimeUnit.WEEK)
+  val MILLISECOND = Value(TimeUnit.MILLISECOND)
+  val MICROSECOND = Value(TimeUnit.MICROSECOND)
+
+}
+
+/**
+  * Modes for trimming strings.
+  */
+object TrimMode extends TableSymbols {
+
+  type TrimMode = TableSymbolValue
+
+  val BOTH = Value(SqlTrimFunction.Flag.BOTH)
+  val LEADING = Value(SqlTrimFunction.Flag.LEADING)
+  val TRAILING = Value(SqlTrimFunction.Flag.TRAILING)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
new file mode 100644
index 0000000..f09e2ad
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.avatica.util.{TimeUnit, TimeUnitRange}
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import org.apache.flink.table.expressions.ExpressionUtils.{divide, getFactor, mod}
+import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{TimeIntervalTypeInfo, TypeCheckUtils}
+import org.apache.flink.table.validate.{ValidationResult, ValidationFailure, ValidationSuccess}
+
+import scala.collection.JavaConversions._
+
+case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends Expression {
+
+  override private[flink] def children: Seq[Expression] = timeIntervalUnit :: temporal :: Nil
+
+  override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (!TypeCheckUtils.isTemporal(temporal.resultType)) {
+      return ValidationFailure(s"Extract operator requires Temporal input, " +
+        s"but $temporal is of type ${temporal.resultType}")
+    }
+
+    timeIntervalUnit match {
+      case SymbolExpression(TimeIntervalUnit.YEAR)
+           | SymbolExpression(TimeIntervalUnit.MONTH)
+           | SymbolExpression(TimeIntervalUnit.DAY)
+        if temporal.resultType == SqlTimeTypeInfo.DATE
+          || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
+          || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS
+          || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MONTHS =>
+        ValidationSuccess
+
+      case SymbolExpression(TimeIntervalUnit.HOUR)
+           | SymbolExpression(TimeIntervalUnit.MINUTE)
+           | SymbolExpression(TimeIntervalUnit.SECOND)
+        if temporal.resultType == SqlTimeTypeInfo.TIME
+          || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
+          || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS =>
+        ValidationSuccess
+
+      case _ =>
+        ValidationFailure(s"Extract operator does not support unit '$timeIntervalUnit' for input" +
+          s" of type '${temporal.resultType}'.")
+    }
+  }
+
+  override def toString: String = s"($temporal).extract($timeIntervalUnit)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    // get wrapped Calcite unit
+    val timeUnitRange = timeIntervalUnit
+      .asInstanceOf[SymbolExpression]
+      .symbol
+      .enum
+      .asInstanceOf[TimeUnitRange]
+
+    // convert RexNodes
+    convertExtract(
+      timeIntervalUnit.toRexNode,
+      timeUnitRange,
+      temporal.toRexNode,
+      relBuilder.asInstanceOf[FlinkRelBuilder])
+  }
+
+  /**
+    * Standard conversion of the EXTRACT operator.
+    * Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#convertExtract()]]
+    */
+  private def convertExtract(
+      timeUnitRangeRexNode: RexNode,
+      timeUnitRange: TimeUnitRange,
+      temporal: RexNode,
+      relBuilder: FlinkRelBuilder)
+    : RexNode = {
+
+    // TODO convert this into Table API expressions to make the code more readable
+    val rexBuilder = relBuilder.getRexBuilder
+    val resultType = relBuilder.getTypeFactory().createTypeFromTypeInfo(LONG_TYPE_INFO)
+    var result = rexBuilder.makeReinterpretCast(
+      resultType,
+      temporal,
+      rexBuilder.makeLiteral(false))
+
+    val unit = timeUnitRange.startUnit
+    val sqlTypeName = temporal.getType.getSqlTypeName
+    unit match {
+      case TimeUnit.YEAR | TimeUnit.MONTH | TimeUnit.DAY =>
+        sqlTypeName match {
+          case SqlTypeName.TIMESTAMP =>
+            result = divide(rexBuilder, result, TimeUnit.DAY.multiplier)
+            return rexBuilder.makeCall(
+              resultType,
+              SqlStdOperatorTable.EXTRACT_DATE,
+              Seq(timeUnitRangeRexNode, result))
+
+          case SqlTypeName.DATE =>
+            return rexBuilder.makeCall(
+              resultType,
+              SqlStdOperatorTable.EXTRACT_DATE,
+              Seq(timeUnitRangeRexNode, result))
+
+          case _ => // do nothing
+        }
+
+      case _ => // do nothing
+    }
+
+    result = mod(rexBuilder, resultType, result, getFactor(unit))
+    result = divide(rexBuilder, result, unit.multiplier)
+    result
+  }
+}
+
+abstract class TemporalCeilFloor(
+    timeIntervalUnit: Expression,
+    temporal: Expression)
+  extends Expression {
+
+  override private[flink] def children: Seq[Expression] = timeIntervalUnit :: temporal :: Nil
+
+  override private[flink] def resultType: TypeInformation[_] = temporal.resultType
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (!TypeCheckUtils.isTimePoint(temporal.resultType)) {
+      return ValidationFailure(s"Temporal ceil/floor operator requires Time Point input, " +
+        s"but $temporal is of type ${temporal.resultType}")
+    }
+    val unit = timeIntervalUnit match {
+      case SymbolExpression(u: TimeIntervalUnit) => Some(u)
+      case _ => None
+    }
+    if (unit.isEmpty) {
+      return ValidationFailure(s"Temporal ceil/floor operator requires Time Interval Unit " +
+        s"input, but $timeIntervalUnit is of type ${timeIntervalUnit.resultType}")
+    }
+
+    (unit.get, temporal.resultType) match {
+      case (TimeIntervalUnit.YEAR | TimeIntervalUnit.MONTH,
+          SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIMESTAMP) =>
+        ValidationSuccess
+      case (TimeIntervalUnit.DAY, SqlTimeTypeInfo.TIMESTAMP) =>
+        ValidationSuccess
+      case (TimeIntervalUnit.HOUR | TimeIntervalUnit.MINUTE | TimeIntervalUnit.SECOND,
+          SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP) =>
+        ValidationSuccess
+      case _ =>
+        ValidationFailure(s"Temporal ceil/floor operator does not support " +
+          s"unit '$timeIntervalUnit' for input of type '${temporal.resultType}'.")
+    }
+  }
+}
+
+case class TemporalFloor(
+    timeIntervalUnit: Expression,
+    temporal: Expression)
+  extends TemporalCeilFloor(
+    timeIntervalUnit,
+    temporal) {
+
+  override def toString: String = s"($temporal).floor($timeIntervalUnit)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.FLOOR, temporal.toRexNode, timeIntervalUnit.toRexNode)
+  }
+}
+
+case class TemporalCeil(
+    timeIntervalUnit: Expression,
+    temporal: Expression)
+  extends TemporalCeilFloor(
+    timeIntervalUnit,
+    temporal) {
+
+  override def toString: String = s"($temporal).ceil($timeIntervalUnit)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.CEIL, temporal.toRexNode, timeIntervalUnit.toRexNode)
+  }
+}
+
+abstract class CurrentTimePoint(
+    targetType: TypeInformation[_],
+    local: Boolean)
+  extends LeafExpression {
+
+  override private[flink] def resultType: TypeInformation[_] = targetType
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (!TypeCheckUtils.isTimePoint(targetType)) {
+      ValidationFailure(s"CurrentTimePoint operator requires Time Point target type, " +
+        s"but get $targetType.")
+    } else if (local && targetType == SqlTimeTypeInfo.DATE) {
+      ValidationFailure(s"Localized CurrentTimePoint operator requires Time or Timestamp target " +
+        s"type, but get $targetType.")
+    } else {
+      ValidationSuccess
+    }
+  }
+
+  override def toString: String = if (local) {
+    s"local$targetType()"
+  } else {
+    s"current$targetType()"
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    val operator = targetType match {
+      case SqlTimeTypeInfo.TIME if local => SqlStdOperatorTable.LOCALTIME
+      case SqlTimeTypeInfo.TIMESTAMP if local => SqlStdOperatorTable.LOCALTIMESTAMP
+      case SqlTimeTypeInfo.DATE => SqlStdOperatorTable.CURRENT_DATE
+      case SqlTimeTypeInfo.TIME => SqlStdOperatorTable.CURRENT_TIME
+      case SqlTimeTypeInfo.TIMESTAMP => SqlStdOperatorTable.CURRENT_TIMESTAMP
+    }
+    relBuilder.call(operator)
+  }
+}
+
+case class CurrentDate() extends CurrentTimePoint(SqlTimeTypeInfo.DATE, local = false)
+
+case class CurrentTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = false)
+
+case class CurrentTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = false)
+
+case class LocalTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = true)
+
+case class LocalTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = true)
+
+/**
+  * Extracts the quarter of a year from a SQL date.
+  */
+case class Quarter(child: Expression) extends UnaryExpression with InputTypeSpec {
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(SqlTimeTypeInfo.DATE)
+
+  override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
+
+  override def toString: String = s"($child).quarter()"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    /**
+      * Standard conversion of the QUARTER operator.
+      * Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#convertQuarter()]]
+      */
+    Plus(
+      Div(
+        Minus(
+          Extract(TimeIntervalUnit.MONTH, child),
+          Literal(1L)),
+        Literal(TimeUnit.QUARTER.multiplier.longValue())),
+      Literal(1L)
+    ).toRexNode
+  }
+}
+
+/**
+  * Determines whether two anchored time intervals overlap.
+  */
+case class TemporalOverlaps(
+    leftTimePoint: Expression,
+    leftTemporal: Expression,
+    rightTimePoint: Expression,
+    rightTemporal: Expression)
+  extends Expression {
+
+  override private[flink] def children: Seq[Expression] =
+    Seq(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal)
+
+  override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (!TypeCheckUtils.isTimePoint(leftTimePoint.resultType)) {
+      return ValidationFailure(s"TemporalOverlaps operator requires leftTimePoint to be of type " +
+        s"Time Point, but get ${leftTimePoint.resultType}.")
+    }
+    if (!TypeCheckUtils.isTimePoint(rightTimePoint.resultType)) {
+      return ValidationFailure(s"TemporalOverlaps operator requires rightTimePoint to be of " +
+        s"type Time Point, but get ${rightTimePoint.resultType}.")
+    }
+    if (leftTimePoint.resultType != rightTimePoint.resultType) {
+      return ValidationFailure(s"TemporalOverlaps operator requires leftTimePoint and " +
+        s"rightTimePoint to be of same type.")
+    }
+
+    // leftTemporal is point, then it must be comparable with leftTimePoint
+    if (TypeCheckUtils.isTimePoint(leftTemporal.resultType)) {
+      if (leftTemporal.resultType != leftTimePoint.resultType) {
+        return ValidationFailure(s"TemporalOverlaps operator requires leftTemporal and " +
+          s"leftTimePoint to be of same type if leftTemporal is of type Time Point.")
+      }
+    } else if (!isTimeInterval(leftTemporal.resultType)) {
+      return ValidationFailure(s"TemporalOverlaps operator requires leftTemporal to be of " +
+        s"type Time Point or Time Interval.")
+    }
+
+    // rightTemporal is point, then it must be comparable with rightTimePoint
+    if (TypeCheckUtils.isTimePoint(rightTemporal.resultType)) {
+      if (rightTemporal.resultType != rightTimePoint.resultType) {
+        return ValidationFailure(s"TemporalOverlaps operator requires rightTemporal and " +
+          s"rightTimePoint to be of same type if rightTemporal is of type Time Point.")
+      }
+    } else if (!isTimeInterval(rightTemporal.resultType)) {
+      return ValidationFailure(s"TemporalOverlaps operator requires rightTemporal to be of " +
+        s"type Time Point or Time Interval.")
+    }
+    ValidationSuccess
+  }
+
+  override def toString: String = s"temporalOverlaps(${children.mkString(", ")})"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    convertOverlaps(
+      leftTimePoint.toRexNode,
+      leftTemporal.toRexNode,
+      rightTimePoint.toRexNode,
+      rightTemporal.toRexNode,
+      relBuilder.asInstanceOf[FlinkRelBuilder])
+  }
+
+  /**
+    * Standard conversion of the OVERLAPS operator.
+    * Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#convertOverlaps()]]
+    */
+  private def convertOverlaps(
+      leftP: RexNode,
+      leftT: RexNode,
+      rightP: RexNode,
+      rightT: RexNode,
+      relBuilder: FlinkRelBuilder)
+    : RexNode = {
+    // leftT = leftP + leftT if leftT is an interval
+    val convLeftT = if (isTimeInterval(leftTemporal.resultType)) {
+        relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, leftP, leftT)
+      } else {
+        leftT
+      }
+    // rightT = rightP + rightT if rightT is an interval
+    val convRightT = if (isTimeInterval(rightTemporal.resultType)) {
+        relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, rightP, rightT)
+      } else {
+        rightT
+      }
+    // leftT >= rightP
+    val leftPred = relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, convLeftT, rightP)
+    // rightT >= leftP
+    val rightPred = relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, convRightT, leftP)
+
+    // leftT >= rightP and rightT >= leftP
+    relBuilder.call(SqlStdOperatorTable.AND, leftPred, rightPred)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala
new file mode 100644
index 0000000..990d928
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.validate.{ValidationFailure, ValidationSuccess}
+
+abstract class WindowProperty(child: Expression) extends UnaryExpression {
+
+  override def toString = s"WindowProperty($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+    throw new UnsupportedOperationException("WindowProperty cannot be transformed to RexNode.")
+
+  override private[flink] def validateInput() =
+    if (child.isInstanceOf[WindowReference]) {
+      ValidationSuccess
+    } else {
+      ValidationFailure("Child must be a window reference.")
+    }
+
+  private[flink] def toNamedWindowProperty(name: String)(implicit relBuilder: RelBuilder)
+    : NamedWindowProperty = NamedWindowProperty(name, this)
+}
+
+case class WindowStart(child: Expression) extends WindowProperty(child) {
+
+  override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP
+
+  override def toString: String = s"start($child)"
+}
+
+case class WindowEnd(child: Expression) extends WindowProperty(child) {
+
+  override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP
+
+  override def toString: String = s"end($child)"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala
new file mode 100644
index 0000000..d01cf68
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions
+
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.expressions.{Expression, ScalarFunctionCall}
+
+/**
+  * Base class for a user-defined scalar function. A user-defined scalar functions maps zero, one,
+  * or multiple scalar values to a new scalar value.
+  *
+  * The behavior of a [[ScalarFunction]] can be defined by implementing a custom evaluation
+  * method. An evaluation method must be declared publicly and named "eval". Evaluation methods
+  * can also be overloaded by implementing multiple methods named "eval".
+  *
+  * User-defined functions must have a default constructor and must be instantiable during runtime.
+  *
+  * By default the result type of an evaluation method is determined by Flink's type extraction
+  * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more
+  * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type
+  * can be manually defined by overriding [[getResultType()]].
+  *
+  * Internally, the Table/SQL API code generation works with primitive values as much as possible.
+  * If a user-defined scalar function should not introduce much overhead during runtime, it is
+  * recommended to declare parameters and result types as primitive types instead of their boxed
+  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+  */
+abstract class ScalarFunction extends UserDefinedFunction {
+
+  /**
+    * Creates a call to a [[ScalarFunction]] in Scala Table API.
+    *
+    * @param params actual parameters of function
+    * @return [[Expression]] in form of a [[ScalarFunctionCall]]
+    */
+  final def apply(params: Expression*): Expression = {
+    ScalarFunctionCall(this, params)
+  }
+
+  override def toString: String = getClass.getCanonicalName
+
+  // ----------------------------------------------------------------------------------------------
+
+  /**
+    * Returns the result type of the evaluation method with a given signature.
+    *
+    * This method needs to be overriden in case Flink's type extraction facilities are not
+    * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
+    * method. Flink's type extraction facilities can handle basic types or
+    * simple POJOs but might be wrong for more complex, custom, or composite types.
+    *
+    * @param signature signature of the method the return type needs to be determined
+    * @return [[TypeInformation]] of result type or null if Flink should determine the type
+    */
+  def getResultType(signature: Array[Class[_]]): TypeInformation[_] = null
+
+  /**
+    * Returns [[TypeInformation]] about the operands of the evaluation method with a given
+    * signature.
+    *
+    * In order to perform operand type inference in SQL (especially when NULL is used) it might be
+    * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
+    * By default Flink's type extraction facilities are used for this but might be wrong for
+    * more complex, custom, or composite types.
+    *
+    * @param signature signature of the method the operand types need to be determined
+    * @return [[TypeInformation]] of  operand types
+    */
+  def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
+    signature.map { c =>
+      try {
+        TypeExtractor.getForClass(c)
+      } catch {
+        case ite: InvalidTypesException =>
+          throw new ValidationException(
+            s"Parameter types of scalar function '${this.getClass.getCanonicalName}' cannot be " +
+            s"automatically determined. Please provide type information manually.")
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
new file mode 100644
index 0000000..653793e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.expressions.{Expression, TableFunctionCall}
+
+/**
+  * Base class for a user-defined table function (UDTF). A user-defined table functions works on
+  * zero, one, or multiple scalar values as input and returns multiple rows as output.
+  *
+  * The behavior of a [[TableFunction]] can be defined by implementing a custom evaluation
+  * method. An evaluation method must be declared publicly, not static and named "eval".
+  * Evaluation methods can also be overloaded by implementing multiple methods named "eval".
+  *
+  * User-defined functions must have a default constructor and must be instantiable during runtime.
+  *
+  * By default the result type of an evaluation method is determined by Flink's type extraction
+  * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more
+  * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type
+  * can be manually defined by overriding [[getResultType()]].
+  *
+  * Internally, the Table/SQL API code generation works with primitive values as much as possible.
+  * If a user-defined table function should not introduce much overhead during runtime, it is
+  * recommended to declare parameters and result types as primitive types instead of their boxed
+  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
+  *
+  * Example:
+  *
+  * {{{
+  *
+  *   public class Split extends TableFunction<String> {
+  *
+  *     // implement an "eval" method with as many parameters as you want
+  *     public void eval(String str) {
+  *       for (String s : str.split(" ")) {
+  *         collect(s);   // use collect(...) to emit an output row
+  *       }
+  *     }
+  *
+  *     // you can overload the eval method here ...
+  *   }
+  *
+  *   val tEnv: TableEnvironment = ...
+  *   val table: Table = ...    // schema: [a: String]
+  *
+  *   // for Scala users
+  *   val split = new Split()
+  *   table.join(split('c) as ('s)).select('a, 's)
+  *
+  *   // for Java users
+  *   tEnv.registerFunction("split", new Split())   // register table function first
+  *   table.join("split(a) as (s)").select("a, s")
+  *
+  *   // for SQL users
+  *   tEnv.registerFunction("split", new Split())   // register table function first
+  *   tEnv.sql("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as T(s)")
+  *
+  * }}}
+  *
+  * @tparam T The type of the output row
+  */
+abstract class TableFunction[T] extends UserDefinedFunction {
+
+  /**
+    * Creates a call to a [[TableFunction]] in Scala Table API.
+    *
+    * @param params actual parameters of function
+    * @return [[Expression]] in form of a [[TableFunctionCall]]
+    */
+  final def apply(params: Expression*)(implicit typeInfo: TypeInformation[T]): Expression = {
+    val resultType = if (getResultType == null) {
+      typeInfo
+    } else {
+      getResultType
+    }
+    TableFunctionCall(getClass.getSimpleName, this, params, resultType)
+  }
+
+  override def toString: String = getClass.getCanonicalName
+
+  // ----------------------------------------------------------------------------------------------
+
+  private val rows: util.ArrayList[T] = new util.ArrayList[T]()
+
+  /**
+    * Emit an output row.
+    *
+    * @param row the output row
+    */
+  protected def collect(row: T): Unit = {
+    // cache rows for now, maybe immediately process them further
+    rows.add(row)
+  }
+
+  /**
+    * Internal use. Get an iterator of the buffered rows.
+    */
+  def getRowsIterator = rows.iterator()
+
+  /**
+    * Internal use. Clear buffered rows.
+    */
+  def clear() = rows.clear()
+
+  // ----------------------------------------------------------------------------------------------
+
+  /**
+    * Returns the result type of the evaluation method with a given signature.
+    *
+    * This method needs to be overriden in case Flink's type extraction facilities are not
+    * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
+    * method. Flink's type extraction facilities can handle basic types or
+    * simple POJOs but might be wrong for more complex, custom, or composite types.
+    *
+    * @return [[TypeInformation]] of result type or null if Flink should determine the type
+    */
+  def getResultType: TypeInformation[T] = null
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
new file mode 100644
index 0000000..b99ab8d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions
+
+/**
+  * Base class for all user-defined functions such as scalar functions, table functions,
+  * or aggregation functions.
+  *
+  * User-defined functions must have a default constructor and must be instantiable during runtime.
+  */
+trait UserDefinedFunction {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/MathFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/MathFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/MathFunctions.scala
new file mode 100644
index 0000000..64e4bc4
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/MathFunctions.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.utils
+
+import java.math.{BigDecimal => JBigDecimal}
+
+class MathFunctions {}
+
+object MathFunctions {
+  def power(a: Double, b: JBigDecimal): Double = {
+    Math.pow(a, b.doubleValue())
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
new file mode 100644
index 0000000..da652e0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.utils
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
+import org.apache.calcite.sql.`type`._
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.functions.utils.ScalarSqlFunction.{createOperandTypeChecker, createOperandTypeInference, createReturnTypeInference}
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getResultType, getSignature, getSignatures, signatureToString, signaturesToString}
+
+import scala.collection.JavaConverters._
+
+/**
+  * Calcite wrapper for user-defined scalar functions.
+  *
+  * @param name function name (used by SQL parser)
+  * @param scalarFunction scalar function to be called
+  * @param typeFactory type factory for converting Flink's between Calcite's types
+  */
+class ScalarSqlFunction(
+    name: String,
+    scalarFunction: ScalarFunction,
+    typeFactory: FlinkTypeFactory)
+  extends SqlFunction(
+    new SqlIdentifier(name, SqlParserPos.ZERO),
+    createReturnTypeInference(name, scalarFunction, typeFactory),
+    createOperandTypeInference(scalarFunction, typeFactory),
+    createOperandTypeChecker(name, scalarFunction),
+    null,
+    SqlFunctionCategory.USER_DEFINED_FUNCTION) {
+
+  def getScalarFunction = scalarFunction
+
+}
+
+object ScalarSqlFunction {
+
+  private[flink] def createReturnTypeInference(
+      name: String,
+      scalarFunction: ScalarFunction,
+      typeFactory: FlinkTypeFactory)
+    : SqlReturnTypeInference = {
+    /**
+      * Return type inference based on [[ScalarFunction]] given information.
+      */
+    new SqlReturnTypeInference {
+      override def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = {
+        val parameters = opBinding
+          .collectOperandTypes()
+          .asScala
+          .map { operandType =>
+            if (operandType.getSqlTypeName == SqlTypeName.NULL) {
+              null
+            } else {
+              FlinkTypeFactory.toTypeInfo(operandType)
+            }
+          }
+        val foundSignature = getSignature(scalarFunction, parameters)
+        if (foundSignature.isEmpty) {
+          throw new ValidationException(
+            s"Given parameters of function '$name' do not match any signature. \n" +
+              s"Actual: ${signatureToString(parameters)} \n" +
+              s"Expected: ${signaturesToString(scalarFunction)}")
+        }
+        val resultType = getResultType(scalarFunction, foundSignature.get)
+        typeFactory.createTypeFromTypeInfo(resultType)
+      }
+    }
+  }
+
+  private[flink] def createOperandTypeInference(
+      scalarFunction: ScalarFunction,
+      typeFactory: FlinkTypeFactory)
+    : SqlOperandTypeInference = {
+    /**
+      * Operand type inference based on [[ScalarFunction]] given information.
+      */
+    new SqlOperandTypeInference {
+      override def inferOperandTypes(
+          callBinding: SqlCallBinding,
+          returnType: RelDataType,
+          operandTypes: Array[RelDataType]): Unit = {
+
+        val operandTypeInfo = getOperandTypeInfo(callBinding)
+
+        val foundSignature = getSignature(scalarFunction, operandTypeInfo)
+          .getOrElse(throw new ValidationException(s"Operand types of could not be inferred."))
+
+        val inferredTypes = scalarFunction
+          .getParameterTypes(foundSignature)
+          .map(typeFactory.createTypeFromTypeInfo)
+
+        inferredTypes.zipWithIndex.foreach {
+          case (inferredType, i) =>
+            operandTypes(i) = inferredType
+        }
+      }
+    }
+  }
+
+  private[flink] def createOperandTypeChecker(
+      name: String,
+      scalarFunction: ScalarFunction)
+    : SqlOperandTypeChecker = {
+
+    val signatures = getSignatures(scalarFunction)
+
+    /**
+      * Operand type checker based on [[ScalarFunction]] given information.
+      */
+    new SqlOperandTypeChecker {
+      override def getAllowedSignatures(op: SqlOperator, opName: String): String = {
+        s"$opName[${signaturesToString(scalarFunction)}]"
+      }
+
+      override def getOperandCountRange: SqlOperandCountRange = {
+        val signatureLengths = signatures.map(_.length)
+        SqlOperandCountRanges.between(signatureLengths.min, signatureLengths.max)
+      }
+
+      override def checkOperandTypes(
+          callBinding: SqlCallBinding,
+          throwOnFailure: Boolean)
+        : Boolean = {
+        val operandTypeInfo = getOperandTypeInfo(callBinding)
+
+        val foundSignature = getSignature(scalarFunction, operandTypeInfo)
+
+        if (foundSignature.isEmpty) {
+          if (throwOnFailure) {
+            throw new ValidationException(
+              s"Given parameters of function '$name' do not match any signature. \n" +
+                s"Actual: ${signatureToString(operandTypeInfo)} \n" +
+                s"Expected: ${signaturesToString(scalarFunction)}")
+          } else {
+            false
+          }
+        } else {
+          true
+        }
+      }
+
+      override def isOptional(i: Int): Boolean = false
+
+      override def getConsistency: Consistency = Consistency.NONE
+
+    }
+  }
+
+  private[flink] def getOperandTypeInfo(callBinding: SqlCallBinding): Seq[TypeInformation[_]] = {
+    val operandTypes = for (i <- 0 until callBinding.getOperandCount)
+      yield callBinding.getOperandType(i)
+    operandTypes.map { operandType =>
+      if (operandType.getSqlTypeName == SqlTypeName.NULL) {
+        null
+      } else {
+        FlinkTypeFactory.toTypeInfo(operandType)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
new file mode 100644
index 0000000..74f3374
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.utils
+
+import com.google.common.base.Predicate
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`._
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction
+import org.apache.calcite.util.Util
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
+
+import scala.collection.JavaConverters._
+import java.util
+
+/**
+  * Calcite wrapper for user-defined table functions.
+  */
+class TableSqlFunction(
+    name: String,
+    udtf: TableFunction[_],
+    rowTypeInfo: TypeInformation[_],
+    returnTypeInference: SqlReturnTypeInference,
+    operandTypeInference: SqlOperandTypeInference,
+    operandTypeChecker: SqlOperandTypeChecker,
+    paramTypes: util.List[RelDataType],
+    functionImpl: FlinkTableFunctionImpl[_])
+  extends SqlUserDefinedTableFunction(
+    new SqlIdentifier(name, SqlParserPos.ZERO),
+    returnTypeInference,
+    operandTypeInference,
+    operandTypeChecker,
+    paramTypes,
+    functionImpl) {
+
+  /**
+    * Get the user-defined table function.
+    */
+  def getTableFunction = udtf
+
+  /**
+    * Get the type information of the table returned by the table function.
+    */
+  def getRowTypeInfo = rowTypeInfo
+
+  /**
+    * Get additional mapping information if the returned table type is a POJO
+    * (POJO types have no deterministic field order).
+    */
+  def getPojoFieldMapping = functionImpl.fieldIndexes
+
+}
+
+object TableSqlFunction {
+
+  /**
+    * Util function to create a [[TableSqlFunction]].
+    *
+    * @param name function name (used by SQL parser)
+    * @param udtf user-defined table function to be called
+    * @param rowTypeInfo the row type information generated by the table function
+    * @param typeFactory type factory for converting Flink's between Calcite's types
+    * @param functionImpl Calcite table function schema
+    * @return [[TableSqlFunction]]
+    */
+  def apply(
+    name: String,
+    udtf: TableFunction[_],
+    rowTypeInfo: TypeInformation[_],
+    typeFactory: FlinkTypeFactory,
+    functionImpl: FlinkTableFunctionImpl[_]): TableSqlFunction = {
+
+    val argTypes: util.List[RelDataType] = new util.ArrayList[RelDataType]
+    val typeFamilies: util.List[SqlTypeFamily] = new util.ArrayList[SqlTypeFamily]
+    // derives operands' data types and type families
+    functionImpl.getParameters.asScala.foreach{ o =>
+      val relType: RelDataType = o.getType(typeFactory)
+      argTypes.add(relType)
+      typeFamilies.add(Util.first(relType.getSqlTypeName.getFamily, SqlTypeFamily.ANY))
+    }
+    // derives whether the 'input'th parameter of a method is optional.
+    val optional: Predicate[Integer] = new Predicate[Integer]() {
+      def apply(input: Integer): Boolean = {
+        functionImpl.getParameters.get(input).isOptional
+      }
+    }
+    // create type check for the operands
+    val typeChecker: FamilyOperandTypeChecker = OperandTypes.family(typeFamilies, optional)
+
+    new TableSqlFunction(
+      name,
+      udtf,
+      rowTypeInfo,
+      ReturnTypes.CURSOR,
+      InferTypes.explicit(argTypes),
+      typeChecker,
+      argTypes,
+      functionImpl)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
new file mode 100644
index 0000000..aa3fab0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.table.functions.utils
+
+import java.lang.reflect.{Method, Modifier}
+import java.sql.{Date, Time, Timestamp}
+
+import com.google.common.primitives.Primitives
+import org.apache.calcite.sql.SqlFunction
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.functions.{ScalarFunction, TableFunction, UserDefinedFunction}
+import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
+import org.apache.flink.util.InstantiationUtil
+
+object UserDefinedFunctionUtils {
+
+  /**
+    * Instantiates a user-defined function.
+    */
+  def instantiate[T <: UserDefinedFunction](clazz: Class[T]): T = {
+    val constructor = clazz.getDeclaredConstructor()
+    constructor.setAccessible(true)
+    constructor.newInstance()
+  }
+
+  /**
+    * Checks if a user-defined function can be easily instantiated.
+    */
+  def checkForInstantiation(clazz: Class[_]): Unit = {
+    if (!InstantiationUtil.isPublic(clazz)) {
+      throw ValidationException("Function class is not public.")
+    }
+    else if (!InstantiationUtil.isProperClass(clazz)) {
+      throw ValidationException("Function class is no proper class, it is either abstract," +
+        " an interface, or a primitive type.")
+    }
+    else if (InstantiationUtil.isNonStaticInnerClass(clazz)) {
+      throw ValidationException("The class is an inner class, but not statically accessible.")
+    }
+
+    // check for default constructor (can be private)
+    clazz
+      .getDeclaredConstructors
+      .find(_.getParameterTypes.isEmpty)
+      .getOrElse(throw ValidationException("Function class needs a default constructor."))
+  }
+
+  /**
+    * Check whether this is a Scala object. It is forbidden to use [[TableFunction]] implemented
+    * by a Scala object, since concurrent risks.
+    */
+  def checkNotSingleton(clazz: Class[_]): Unit = {
+    // TODO it is not a good way to check singleton. Maybe improve it further.
+    if (clazz.getFields.map(_.getName) contains "MODULE$") {
+      throw new ValidationException(
+        s"TableFunction implemented by class ${clazz.getCanonicalName} " +
+          s"is a Scala object, it is forbidden since concurrent risks.")
+    }
+  }
+
+  // ----------------------------------------------------------------------------------------------
+  // Utilities for eval methods
+  // ----------------------------------------------------------------------------------------------
+
+  /**
+    * Returns signatures matching the given signature of [[TypeInformation]].
+    * Elements of the signature can be null (act as a wildcard).
+    */
+  def getSignature(
+      function: UserDefinedFunction,
+      signature: Seq[TypeInformation[_]])
+    : Option[Array[Class[_]]] = {
+    // We compare the raw Java classes not the TypeInformation.
+    // TypeInformation does not matter during runtime (e.g. within a MapFunction).
+    val actualSignature = typeInfoToClass(signature)
+    val signatures = getSignatures(function)
+
+    signatures
+      // go over all signatures and find one matching actual signature
+      .find { curSig =>
+      // match parameters of signature to actual parameters
+      actualSignature.length == curSig.length &&
+        curSig.zipWithIndex.forall { case (clazz, i) =>
+          parameterTypeEquals(actualSignature(i), clazz)
+        }
+    }
+  }
+
+  /**
+    * Returns eval method matching the given signature of [[TypeInformation]].
+    */
+  def getEvalMethod(
+      function: UserDefinedFunction,
+      signature: Seq[TypeInformation[_]])
+    : Option[Method] = {
+    // We compare the raw Java classes not the TypeInformation.
+    // TypeInformation does not matter during runtime (e.g. within a MapFunction).
+    val actualSignature = typeInfoToClass(signature)
+    val evalMethods = checkAndExtractEvalMethods(function)
+
+    evalMethods
+      // go over all eval methods and find one matching
+      .find { cur =>
+      val signatures = cur.getParameterTypes
+      // match parameters of signature to actual parameters
+      actualSignature.length == signatures.length &&
+        signatures.zipWithIndex.forall { case (clazz, i) =>
+          parameterTypeEquals(actualSignature(i), clazz)
+        }
+    }
+  }
+
+  /**
+    * Extracts "eval" methods and throws a [[ValidationException]] if no implementation
+    * can be found.
+    */
+  def checkAndExtractEvalMethods(function: UserDefinedFunction): Array[Method] = {
+    val methods = function
+      .getClass
+      .getDeclaredMethods
+      .filter { m =>
+        val modifiers = m.getModifiers
+        m.getName == "eval" &&
+          Modifier.isPublic(modifiers) &&
+          !Modifier.isAbstract(modifiers) &&
+          !(function.isInstanceOf[TableFunction[_]] && Modifier.isStatic(modifiers))
+      }
+
+    if (methods.isEmpty) {
+      throw new ValidationException(
+        s"Function class '${function.getClass.getCanonicalName}' does not implement at least " +
+          s"one method named 'eval' which is public, not abstract and " +
+          s"(in case of table functions) not static.")
+    } else {
+      methods
+    }
+  }
+
+  def getSignatures(function: UserDefinedFunction): Array[Array[Class[_]]] = {
+    checkAndExtractEvalMethods(function).map(_.getParameterTypes)
+  }
+
+  // ----------------------------------------------------------------------------------------------
+  // Utilities for SQL functions
+  // ----------------------------------------------------------------------------------------------
+
+  /**
+    * Create [[SqlFunction]] for a [[ScalarFunction]]
+ *
+    * @param name function name
+    * @param function scalar function
+    * @param typeFactory type factory
+    * @return the ScalarSqlFunction
+    */
+  def createScalarSqlFunction(
+      name: String,
+      function: ScalarFunction,
+      typeFactory: FlinkTypeFactory)
+    : SqlFunction = {
+    new ScalarSqlFunction(name, function, typeFactory)
+  }
+
+  /**
+    * Create [[SqlFunction]]s for a [[TableFunction]]'s every eval method
+ *
+    * @param name function name
+    * @param tableFunction table function
+    * @param resultType the type information of returned table
+    * @param typeFactory type factory
+    * @return the TableSqlFunction
+    */
+  def createTableSqlFunctions(
+      name: String,
+      tableFunction: TableFunction[_],
+      resultType: TypeInformation[_],
+      typeFactory: FlinkTypeFactory)
+    : Seq[SqlFunction] = {
+    val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType)
+    val evalMethods = checkAndExtractEvalMethods(tableFunction)
+
+    evalMethods.map { method =>
+      val function = new FlinkTableFunctionImpl(resultType, fieldIndexes, fieldNames, method)
+      TableSqlFunction(name, tableFunction, resultType, typeFactory, function)
+    }
+  }
+
+  // ----------------------------------------------------------------------------------------------
+  // Utilities for scalar functions
+  // ----------------------------------------------------------------------------------------------
+
+  /**
+    * Internal method of [[ScalarFunction#getResultType()]] that does some pre-checking and uses
+    * [[TypeExtractor]] as default return type inference.
+    */
+  def getResultType(
+      function: ScalarFunction,
+      signature: Array[Class[_]])
+    : TypeInformation[_] = {
+    // find method for signature
+    val evalMethod = checkAndExtractEvalMethods(function)
+      .find(m => signature.sameElements(m.getParameterTypes))
+      .getOrElse(throw new ValidationException("Given signature is invalid."))
+
+    val userDefinedTypeInfo = function.getResultType(signature)
+    if (userDefinedTypeInfo != null) {
+      userDefinedTypeInfo
+    } else {
+      try {
+        TypeExtractor.getForClass(evalMethod.getReturnType)
+      } catch {
+        case ite: InvalidTypesException =>
+          throw new ValidationException(
+            s"Return type of scalar function '${function.getClass.getCanonicalName}' cannot be " +
+              s"automatically determined. Please provide type information manually.")
+      }
+    }
+  }
+
+  /**
+    * Returns the return type of the evaluation method matching the given signature.
+    */
+  def getResultTypeClass(
+      function: ScalarFunction,
+      signature: Array[Class[_]])
+    : Class[_] = {
+    // find method for signature
+    val evalMethod = checkAndExtractEvalMethods(function)
+      .find(m => signature.sameElements(m.getParameterTypes))
+      .getOrElse(throw new IllegalArgumentException("Given signature is invalid."))
+    evalMethod.getReturnType
+  }
+
+  // ----------------------------------------------------------------------------------------------
+  // Miscellaneous
+  // ----------------------------------------------------------------------------------------------
+
+  /**
+    * Returns field names and field positions for a given [[TypeInformation]].
+    *
+    * Field names are automatically extracted for
+    * [[org.apache.flink.api.common.typeutils.CompositeType]].
+    *
+    * @param inputType The TypeInformation to extract the field names and positions from.
+    * @return A tuple of two arrays holding the field names and corresponding field positions.
+    */
+  def getFieldInfo(inputType: TypeInformation[_])
+    : (Array[String], Array[Int], Array[TypeInformation[_]]) = {
+
+    val fieldNames: Array[String] = inputType match {
+      case t: CompositeType[_] => t.getFieldNames
+      case a: AtomicType[_] => Array("f0")
+      case tpe =>
+        throw new TableException(s"Currently only CompositeType and AtomicType are supported. " +
+          s"Type $tpe lacks explicit field naming")
+    }
+    val fieldIndexes = fieldNames.indices.toArray
+    val fieldTypes: Array[TypeInformation[_]] = fieldNames.map { i =>
+      inputType match {
+        case t: CompositeType[_] => t.getTypeAt(i).asInstanceOf[TypeInformation[_]]
+        case a: AtomicType[_] => a.asInstanceOf[TypeInformation[_]]
+        case tpe =>
+          throw new TableException(s"Currently only CompositeType and AtomicType are supported.")
+      }
+    }
+    (fieldNames, fieldIndexes, fieldTypes)
+  }
+
+  /**
+    * Prints one signature consisting of classes.
+    */
+  def signatureToString(signature: Array[Class[_]]): String =
+  signature.map { clazz =>
+    if (clazz == null) {
+      "null"
+    } else {
+      clazz.getCanonicalName
+    }
+  }.mkString("(", ", ", ")")
+
+  /**
+    * Prints one signature consisting of TypeInformation.
+    */
+  def signatureToString(signature: Seq[TypeInformation[_]]): String = {
+    signatureToString(typeInfoToClass(signature))
+  }
+
+  /**
+    * Prints all eval methods signatures of a class.
+    */
+  def signaturesToString(function: UserDefinedFunction): String = {
+    getSignatures(function).map(signatureToString).mkString(", ")
+  }
+
+  /**
+    * Extracts type classes of [[TypeInformation]] in a null-aware way.
+    */
+  private def typeInfoToClass(typeInfos: Seq[TypeInformation[_]]): Array[Class[_]] =
+  typeInfos.map { typeInfo =>
+    if (typeInfo == null) {
+      null
+    } else {
+      typeInfo.getTypeClass
+    }
+  }.toArray
+
+
+  /**
+    * Compares parameter candidate classes with expected classes. If true, the parameters match.
+    * Candidate can be null (acts as a wildcard).
+    */
+  private def parameterTypeEquals(candidate: Class[_], expected: Class[_]): Boolean =
+  candidate == null ||
+    candidate == expected ||
+    expected.isPrimitive && Primitives.wrap(expected) == candidate ||
+    candidate == classOf[Date] && expected == classOf[Int] ||
+    candidate == classOf[Time] && expected == classOf[Int] ||
+    candidate == classOf[Timestamp] && expected == classOf[Long]
+
+}