You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:05 UTC

[36/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala
deleted file mode 100644
index e0f4691..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.typeutils.TypeCheckUtils
-import org.apache.flink.api.table.validate._
-
-case class Abs(child: Expression) extends UnaryExpression {
-  override private[flink] def resultType: TypeInformation[_] = child.resultType
-
-  override private[flink] def validateInput(): ValidationResult =
-    TypeCheckUtils.assertNumericExpr(child.resultType, "Abs")
-
-  override def toString: String = s"abs($child)"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.ABS, child.toRexNode)
-  }
-}
-
-case class Ceil(child: Expression) extends UnaryExpression {
-  override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
-
-  override private[flink] def validateInput(): ValidationResult =
-    TypeCheckUtils.assertNumericExpr(child.resultType, "Ceil")
-
-  override def toString: String = s"ceil($child)"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.CEIL, child.toRexNode)
-  }
-}
-
-case class Exp(child: Expression) extends UnaryExpression with InputTypeSpec {
-  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
-
-  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
-
-  override def toString: String = s"exp($child)"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.EXP, child.toRexNode)
-  }
-}
-
-
-case class Floor(child: Expression) extends UnaryExpression {
-  override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
-
-  override private[flink] def validateInput(): ValidationResult =
-    TypeCheckUtils.assertNumericExpr(child.resultType, "Floor")
-
-  override def toString: String = s"floor($child)"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.FLOOR, child.toRexNode)
-  }
-}
-
-case class Log10(child: Expression) extends UnaryExpression with InputTypeSpec {
-  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
-
-  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
-
-  override def toString: String = s"log10($child)"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.LOG10, child.toRexNode)
-  }
-}
-
-case class Ln(child: Expression) extends UnaryExpression with InputTypeSpec {
-  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
-
-  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
-
-  override def toString: String = s"ln($child)"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.LN, child.toRexNode)
-  }
-}
-
-case class Power(left: Expression, right: Expression) extends BinaryExpression with InputTypeSpec {
-  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
-
-  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
-    DOUBLE_TYPE_INFO :: DOUBLE_TYPE_INFO :: Nil
-
-  override def toString: String = s"pow($left, $right)"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.POWER, left.toRexNode, right.toRexNode)
-  }
-}
-
-case class Sqrt(child: Expression) extends UnaryExpression with InputTypeSpec {
-  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
-
-  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
-    Seq(DOUBLE_TYPE_INFO)
-
-  override def toString: String = s"sqrt($child)"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.POWER, child.toRexNode, Literal(0.5).toRexNode)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
deleted file mode 100644
index c15d462..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.tools.RelBuilder
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.validate._
-
-abstract class Ordering extends UnaryExpression {
-  override private[flink] def validateInput(): ValidationResult = {
-    if (!child.isInstanceOf[NamedExpression]) {
-      ValidationFailure(s"Sort should only based on field reference")
-    } else {
-      ValidationSuccess
-    }
-  }
-}
-
-case class Asc(child: Expression) extends Ordering {
-  override def toString: String = s"($child).asc"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    child.toRexNode
-  }
-
-  override private[flink] def resultType: TypeInformation[_] = child.resultType
-}
-
-case class Desc(child: Expression) extends Ordering {
-  override def toString: String = s"($child).desc"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.desc(child.toRexNode)
-  }
-
-  override private[flink] def resultType: TypeInformation[_] = child.resultType
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
deleted file mode 100644
index 2e5d0b2..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-/**
- * This package contains the base class of AST nodes and all the expression language AST classes.
- * Expression trees should not be manually constructed by users. They are implicitly constructed
- * from the implicit DSL conversions in
- * [[org.apache.flink.api.scala.table.ImplicitExpressionConversions]] and
- * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]]. For the Java API,
- * expression trees should be generated from a string parser that parses expressions and creates
- * AST nodes.
- */
-package object expressions

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
deleted file mode 100644
index 56b5b5e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import scala.collection.JavaConversions._
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.expressions.TrimMode.TrimMode
-import org.apache.flink.api.table.validate._
-
-/**
-  * Returns the length of this `str`.
-  */
-case class CharLength(child: Expression) extends UnaryExpression {
-  override private[flink] def resultType: TypeInformation[_] = INT_TYPE_INFO
-
-  override private[flink] def validateInput(): ValidationResult = {
-    if (child.resultType == STRING_TYPE_INFO) {
-      ValidationSuccess
-    } else {
-      ValidationFailure(s"CharLength operator requires String input, " +
-        s"but $child is of type ${child.resultType}")
-    }
-  }
-
-  override def toString: String = s"($child).charLength()"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.CHAR_LENGTH, child.toRexNode)
-  }
-}
-
-/**
-  * Returns str with the first letter of each word in uppercase.
-  * All other letters are in lowercase. Words are delimited by white space.
-  */
-case class InitCap(child: Expression) extends UnaryExpression {
-  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
-
-  override private[flink] def validateInput(): ValidationResult = {
-    if (child.resultType == STRING_TYPE_INFO) {
-      ValidationSuccess
-    } else {
-      ValidationFailure(s"InitCap operator requires String input, " + 
-        s"but $child is of type ${child.resultType}")
-    }
-  }
-
-  override def toString: String = s"($child).initCap()"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.INITCAP, child.toRexNode)
-  }
-}
-
-/**
-  * Returns true if `str` matches `pattern`.
-  */
-case class Like(str: Expression, pattern: Expression) extends BinaryExpression {
-  private[flink] def left: Expression = str
-  private[flink] def right: Expression = pattern
-
-  override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
-
-  override private[flink] def validateInput(): ValidationResult = {
-    if (str.resultType == STRING_TYPE_INFO && pattern.resultType == STRING_TYPE_INFO) {
-      ValidationSuccess
-    } else {
-      ValidationFailure(s"Like operator requires (String, String) input, " +
-        s"but ($str, $pattern) is of type (${str.resultType}, ${pattern.resultType})")
-    }
-  }
-
-  override def toString: String = s"($str).like($pattern)"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.LIKE, children.map(_.toRexNode))
-  }
-}
-
-/**
-  * Returns str with all characters changed to lowercase.
-  */
-case class Lower(child: Expression) extends UnaryExpression {
-  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
-
-  override private[flink] def validateInput(): ValidationResult = {
-    if (child.resultType == STRING_TYPE_INFO) {
-      ValidationSuccess
-    } else {
-      ValidationFailure(s"Lower operator requires String input, " +
-        s"but $child is of type ${child.resultType}")
-    }
-  }
-
-  override def toString: String = s"($child).toLowerCase()"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.LOWER, child.toRexNode)
-  }
-}
-
-/**
-  * Returns true if `str` is similar to `pattern`.
-  */
-case class Similar(str: Expression, pattern: Expression) extends BinaryExpression {
-  private[flink] def left: Expression = str
-  private[flink] def right: Expression = pattern
-
-  override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
-
-  override private[flink] def validateInput(): ValidationResult = {
-    if (str.resultType == STRING_TYPE_INFO && pattern.resultType == STRING_TYPE_INFO) {
-      ValidationSuccess
-    } else {
-      ValidationFailure(s"Similar operator requires (String, String) input, " +
-        s"but ($str, $pattern) is of type (${str.resultType}, ${pattern.resultType})")
-    }
-  }
-
-  override def toString: String = s"($str).similarTo($pattern)"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.SIMILAR_TO, children.map(_.toRexNode))
-  }
-}
-
-/**
-  * Returns substring of `str` from `begin`(inclusive) for `length`.
-  */
-case class Substring(
-    str: Expression,
-    begin: Expression,
-    length: Expression) extends Expression with InputTypeSpec {
-
-  def this(str: Expression, begin: Expression) = this(str, begin, CharLength(str))
-
-  override private[flink] def children: Seq[Expression] = str :: begin :: length :: Nil
-
-  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
-
-  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
-    Seq(STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
-
-  override def toString: String = s"($str).substring($begin, $length)"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.SUBSTRING, children.map(_.toRexNode))
-  }
-}
-
-/**
-  * Trim `trimString` from `str` according to `trimMode`.
-  */
-case class Trim(
-    trimMode: Expression,
-    trimString: Expression,
-    str: Expression) extends Expression {
-
-  override private[flink] def children: Seq[Expression] = trimMode :: trimString :: str :: Nil
-
-  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
-
-  override private[flink] def validateInput(): ValidationResult = {
-    trimMode match {
-      case SymbolExpression(_: TrimMode) =>
-        if (trimString.resultType != STRING_TYPE_INFO) {
-          ValidationFailure(s"String expected for trimString, get ${trimString.resultType}")
-        } else if (str.resultType != STRING_TYPE_INFO) {
-          ValidationFailure(s"String expected for str, get ${str.resultType}")
-        } else {
-          ValidationSuccess
-        }
-      case _ => ValidationFailure("TrimMode symbol expected.")
-    }
-  }
-
-  override def toString: String = s"($str).trim($trimMode, $trimString)"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.TRIM, children.map(_.toRexNode))
-  }
-}
-
-/**
-  * Enumeration of trim flags.
-  */
-object TrimConstants {
-  val TRIM_DEFAULT_CHAR = Literal(" ")
-}
-
-/**
-  * Returns str with all characters changed to uppercase.
-  */
-case class Upper(child: Expression) extends UnaryExpression with InputTypeSpec {
-
-  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
-
-  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
-    Seq(STRING_TYPE_INFO)
-
-  override def toString: String = s"($child).upperCase()"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.UPPER, child.toRexNode)
-  }
-}
-
-/**
-  * Returns the position of string needle in string haystack.
-  */
-case class Position(needle: Expression, haystack: Expression)
-    extends Expression with InputTypeSpec {
-
-  override private[flink] def children: Seq[Expression] = Seq(needle, haystack)
-
-  override private[flink] def resultType: TypeInformation[_] = INT_TYPE_INFO
-
-  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
-    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO)
-
-  override def toString: String = s"($needle).position($haystack)"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.POSITION, needle.toRexNode, haystack.toRexNode)
-  }
-}
-
-/**
-  * Replaces a substring of a string with a replacement string.
-  * Starting at a position for a given length.
-  */
-case class Overlay(
-    str: Expression,
-    replacement: Expression,
-    starting: Expression,
-    position: Expression)
-  extends Expression with InputTypeSpec {
-
-  def this(str: Expression, replacement: Expression, starting: Expression) =
-    this(str, replacement, starting, CharLength(replacement))
-
-  override private[flink] def children: Seq[Expression] =
-    Seq(str, replacement, starting, position)
-
-  override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
-
-  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
-    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
-
-  override def toString: String = s"($str).overlay($replacement, $starting, $position)"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(
-      SqlStdOperatorTable.OVERLAY,
-      str.toRexNode,
-      replacement.toRexNode,
-      starting.toRexNode,
-      position.toRexNode)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/symbols.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/symbols.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/symbols.scala
deleted file mode 100644
index dfa8820..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/symbols.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.avatica.util.{TimeUnit, TimeUnitRange}
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlTrimFunction
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-import scala.language.{existentials, implicitConversions}
-
-/**
-  * General expression class to represent a symbol.
-  */
-case class SymbolExpression(symbol: TableSymbol) extends LeafExpression {
-
-  override private[flink] def resultType: TypeInformation[_] =
-    throw new UnsupportedOperationException("This should not happen. A symbol has no result type.")
-
-  def toExpr = this // triggers implicit conversion
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    // dirty hack to pass Java enums to Java from Scala
-    val enum = symbol.enum.asInstanceOf[Enum[T] forSome { type T <: Enum[T] }]
-    relBuilder.getRexBuilder.makeFlag(enum)
-  }
-
-  override def toString: String = s"${symbol.symbols}.${symbol.name}"
-
-}
-
-/**
-  * Symbol that wraps a Calcite symbol in form of a Java enum.
-  */
-trait TableSymbol {
-  def symbols: TableSymbols
-  def name: String
-  def enum: Enum[_]
-}
-
-/**
-  * Enumeration of symbols.
-  */
-abstract class TableSymbols extends Enumeration {
-
-  class TableSymbolValue(e: Enum[_]) extends Val(e.name()) with TableSymbol {
-    override def symbols: TableSymbols = TableSymbols.this
-
-    override def enum: Enum[_] = e
-
-    override def name: String = toString()
-  }
-
-  protected final def Value(enum: Enum[_]): TableSymbolValue = new TableSymbolValue(enum)
-
-  implicit def symbolToExpression(symbol: TableSymbolValue): SymbolExpression =
-    SymbolExpression(symbol)
-
-}
-
-/**
-  * Units for working with time intervals.
-  */
-object TimeIntervalUnit extends TableSymbols {
-
-  type TimeIntervalUnit = TableSymbolValue
-
-  val YEAR = Value(TimeUnitRange.YEAR)
-  val YEAR_TO_MONTH = Value(TimeUnitRange.YEAR_TO_MONTH)
-  val MONTH = Value(TimeUnitRange.MONTH)
-  val DAY = Value(TimeUnitRange.DAY)
-  val DAY_TO_HOUR = Value(TimeUnitRange.DAY_TO_HOUR)
-  val DAY_TO_MINUTE = Value(TimeUnitRange.DAY_TO_MINUTE)
-  val DAY_TO_SECOND = Value(TimeUnitRange.DAY_TO_SECOND)
-  val HOUR = Value(TimeUnitRange.HOUR)
-  val HOUR_TO_MINUTE = Value(TimeUnitRange.HOUR_TO_MINUTE)
-  val HOUR_TO_SECOND = Value(TimeUnitRange.HOUR_TO_SECOND)
-  val MINUTE = Value(TimeUnitRange.MINUTE)
-  val MINUTE_TO_SECOND = Value(TimeUnitRange.MINUTE_TO_SECOND)
-  val SECOND = Value(TimeUnitRange.SECOND)
-
-}
-
-/**
-  * Units for working with time points.
-  */
-object TimePointUnit extends TableSymbols {
-
-  type TimePointUnit = TableSymbolValue
-
-  val YEAR = Value(TimeUnit.YEAR)
-  val MONTH = Value(TimeUnit.MONTH)
-  val DAY = Value(TimeUnit.DAY)
-  val HOUR = Value(TimeUnit.HOUR)
-  val MINUTE = Value(TimeUnit.MINUTE)
-  val SECOND = Value(TimeUnit.SECOND)
-  val QUARTER = Value(TimeUnit.QUARTER)
-  val WEEK = Value(TimeUnit.WEEK)
-  val MILLISECOND = Value(TimeUnit.MILLISECOND)
-  val MICROSECOND = Value(TimeUnit.MICROSECOND)
-
-}
-
-/**
-  * Modes for trimming strings.
-  */
-object TrimMode extends TableSymbols {
-
-  type TrimMode = TableSymbolValue
-
-  val BOTH = Value(SqlTrimFunction.Flag.BOTH)
-  val LEADING = Value(SqlTrimFunction.Flag.LEADING)
-  val TRAILING = Value(SqlTrimFunction.Flag.TRAILING)
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
deleted file mode 100644
index cd5ca0a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.avatica.util.{TimeUnit, TimeUnitRange}
-import org.apache.calcite.rex._
-import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.table.FlinkRelBuilder
-import org.apache.flink.api.table.expressions.ExpressionUtils.{divide, getFactor, mod}
-import org.apache.flink.api.table.expressions.TimeIntervalUnit.TimeIntervalUnit
-import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.api.table.typeutils.{TimeIntervalTypeInfo, TypeCheckUtils}
-import org.apache.flink.api.table.validate.{ValidationResult, ValidationFailure, ValidationSuccess}
-
-import scala.collection.JavaConversions._
-
-case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends Expression {
-
-  override private[flink] def children: Seq[Expression] = timeIntervalUnit :: temporal :: Nil
-
-  override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
-
-  override private[flink] def validateInput(): ValidationResult = {
-    if (!TypeCheckUtils.isTemporal(temporal.resultType)) {
-      return ValidationFailure(s"Extract operator requires Temporal input, " +
-        s"but $temporal is of type ${temporal.resultType}")
-    }
-
-    timeIntervalUnit match {
-      case SymbolExpression(TimeIntervalUnit.YEAR)
-           | SymbolExpression(TimeIntervalUnit.MONTH)
-           | SymbolExpression(TimeIntervalUnit.DAY)
-        if temporal.resultType == SqlTimeTypeInfo.DATE
-          || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
-          || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS
-          || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MONTHS =>
-        ValidationSuccess
-
-      case SymbolExpression(TimeIntervalUnit.HOUR)
-           | SymbolExpression(TimeIntervalUnit.MINUTE)
-           | SymbolExpression(TimeIntervalUnit.SECOND)
-        if temporal.resultType == SqlTimeTypeInfo.TIME
-          || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
-          || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS =>
-        ValidationSuccess
-
-      case _ =>
-        ValidationFailure(s"Extract operator does not support unit '$timeIntervalUnit' for input" +
-          s" of type '${temporal.resultType}'.")
-    }
-  }
-
-  override def toString: String = s"($temporal).extract($timeIntervalUnit)"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    // get wrapped Calcite unit
-    val timeUnitRange = timeIntervalUnit
-      .asInstanceOf[SymbolExpression]
-      .symbol
-      .enum
-      .asInstanceOf[TimeUnitRange]
-
-    // convert RexNodes
-    convertExtract(
-      timeIntervalUnit.toRexNode,
-      timeUnitRange,
-      temporal.toRexNode,
-      relBuilder.asInstanceOf[FlinkRelBuilder])
-  }
-
-  /**
-    * Standard conversion of the EXTRACT operator.
-    * Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#convertExtract()]]
-    */
-  private def convertExtract(
-      timeUnitRangeRexNode: RexNode,
-      timeUnitRange: TimeUnitRange,
-      temporal: RexNode,
-      relBuilder: FlinkRelBuilder)
-    : RexNode = {
-
-    // TODO convert this into Table API expressions to make the code more readable
-    val rexBuilder = relBuilder.getRexBuilder
-    val resultType = relBuilder.getTypeFactory().createTypeFromTypeInfo(LONG_TYPE_INFO)
-    var result = rexBuilder.makeReinterpretCast(
-      resultType,
-      temporal,
-      rexBuilder.makeLiteral(false))
-
-    val unit = timeUnitRange.startUnit
-    val sqlTypeName = temporal.getType.getSqlTypeName
-    unit match {
-      case TimeUnit.YEAR | TimeUnit.MONTH | TimeUnit.DAY =>
-        sqlTypeName match {
-          case SqlTypeName.TIMESTAMP =>
-            result = divide(rexBuilder, result, TimeUnit.DAY.multiplier)
-            return rexBuilder.makeCall(
-              resultType,
-              SqlStdOperatorTable.EXTRACT_DATE,
-              Seq(timeUnitRangeRexNode, result))
-
-          case SqlTypeName.DATE =>
-            return rexBuilder.makeCall(
-              resultType,
-              SqlStdOperatorTable.EXTRACT_DATE,
-              Seq(timeUnitRangeRexNode, result))
-
-          case _ => // do nothing
-        }
-
-      case _ => // do nothing
-    }
-
-    result = mod(rexBuilder, resultType, result, getFactor(unit))
-    result = divide(rexBuilder, result, unit.multiplier)
-    result
-  }
-}
-
-abstract class TemporalCeilFloor(
-    timeIntervalUnit: Expression,
-    temporal: Expression)
-  extends Expression {
-
-  override private[flink] def children: Seq[Expression] = timeIntervalUnit :: temporal :: Nil
-
-  override private[flink] def resultType: TypeInformation[_] = temporal.resultType
-
-  override private[flink] def validateInput(): ValidationResult = {
-    if (!TypeCheckUtils.isTimePoint(temporal.resultType)) {
-      return ValidationFailure(s"Temporal ceil/floor operator requires Time Point input, " +
-        s"but $temporal is of type ${temporal.resultType}")
-    }
-    val unit = timeIntervalUnit match {
-      case SymbolExpression(u: TimeIntervalUnit) => Some(u)
-      case _ => None
-    }
-    if (unit.isEmpty) {
-      return ValidationFailure(s"Temporal ceil/floor operator requires Time Interval Unit " +
-        s"input, but $timeIntervalUnit is of type ${timeIntervalUnit.resultType}")
-    }
-
-    (unit.get, temporal.resultType) match {
-      case (TimeIntervalUnit.YEAR | TimeIntervalUnit.MONTH,
-          SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIMESTAMP) =>
-        ValidationSuccess
-      case (TimeIntervalUnit.DAY, SqlTimeTypeInfo.TIMESTAMP) =>
-        ValidationSuccess
-      case (TimeIntervalUnit.HOUR | TimeIntervalUnit.MINUTE | TimeIntervalUnit.SECOND,
-          SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP) =>
-        ValidationSuccess
-      case _ =>
-        ValidationFailure(s"Temporal ceil/floor operator does not support " +
-          s"unit '$timeIntervalUnit' for input of type '${temporal.resultType}'.")
-    }
-  }
-}
-
-case class TemporalFloor(
-    timeIntervalUnit: Expression,
-    temporal: Expression)
-  extends TemporalCeilFloor(
-    timeIntervalUnit,
-    temporal) {
-
-  override def toString: String = s"($temporal).floor($timeIntervalUnit)"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.FLOOR, temporal.toRexNode, timeIntervalUnit.toRexNode)
-  }
-}
-
-case class TemporalCeil(
-    timeIntervalUnit: Expression,
-    temporal: Expression)
-  extends TemporalCeilFloor(
-    timeIntervalUnit,
-    temporal) {
-
-  override def toString: String = s"($temporal).ceil($timeIntervalUnit)"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.CEIL, temporal.toRexNode, timeIntervalUnit.toRexNode)
-  }
-}
-
-abstract class CurrentTimePoint(
-    targetType: TypeInformation[_],
-    local: Boolean)
-  extends LeafExpression {
-
-  override private[flink] def resultType: TypeInformation[_] = targetType
-
-  override private[flink] def validateInput(): ValidationResult = {
-    if (!TypeCheckUtils.isTimePoint(targetType)) {
-      ValidationFailure(s"CurrentTimePoint operator requires Time Point target type, " +
-        s"but get $targetType.")
-    } else if (local && targetType == SqlTimeTypeInfo.DATE) {
-      ValidationFailure(s"Localized CurrentTimePoint operator requires Time or Timestamp target " +
-        s"type, but get $targetType.")
-    } else {
-      ValidationSuccess
-    }
-  }
-
-  override def toString: String = if (local) {
-    s"local$targetType()"
-  } else {
-    s"current$targetType()"
-  }
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    val operator = targetType match {
-      case SqlTimeTypeInfo.TIME if local => SqlStdOperatorTable.LOCALTIME
-      case SqlTimeTypeInfo.TIMESTAMP if local => SqlStdOperatorTable.LOCALTIMESTAMP
-      case SqlTimeTypeInfo.DATE => SqlStdOperatorTable.CURRENT_DATE
-      case SqlTimeTypeInfo.TIME => SqlStdOperatorTable.CURRENT_TIME
-      case SqlTimeTypeInfo.TIMESTAMP => SqlStdOperatorTable.CURRENT_TIMESTAMP
-    }
-    relBuilder.call(operator)
-  }
-}
-
-case class CurrentDate() extends CurrentTimePoint(SqlTimeTypeInfo.DATE, local = false)
-
-case class CurrentTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = false)
-
-case class CurrentTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = false)
-
-case class LocalTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = true)
-
-case class LocalTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = true)
-
-/**
-  * Extracts the quarter of a year from a SQL date.
-  */
-case class Quarter(child: Expression) extends UnaryExpression with InputTypeSpec {
-
-  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(SqlTimeTypeInfo.DATE)
-
-  override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
-
-  override def toString: String = s"($child).quarter()"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    /**
-      * Standard conversion of the QUARTER operator.
-      * Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#convertQuarter()]]
-      */
-    Plus(
-      Div(
-        Minus(
-          Extract(TimeIntervalUnit.MONTH, child),
-          Literal(1L)),
-        Literal(TimeUnit.QUARTER.multiplier.longValue())),
-      Literal(1L)
-    ).toRexNode
-  }
-}
-
-/**
-  * Determines whether two anchored time intervals overlap.
-  */
-case class TemporalOverlaps(
-    leftTimePoint: Expression,
-    leftTemporal: Expression,
-    rightTimePoint: Expression,
-    rightTemporal: Expression)
-  extends Expression {
-
-  override private[flink] def children: Seq[Expression] =
-    Seq(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal)
-
-  override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
-
-  override private[flink] def validateInput(): ValidationResult = {
-    if (!TypeCheckUtils.isTimePoint(leftTimePoint.resultType)) {
-      return ValidationFailure(s"TemporalOverlaps operator requires leftTimePoint to be of type " +
-        s"Time Point, but get ${leftTimePoint.resultType}.")
-    }
-    if (!TypeCheckUtils.isTimePoint(rightTimePoint.resultType)) {
-      return ValidationFailure(s"TemporalOverlaps operator requires rightTimePoint to be of " +
-        s"type Time Point, but get ${rightTimePoint.resultType}.")
-    }
-    if (leftTimePoint.resultType != rightTimePoint.resultType) {
-      return ValidationFailure(s"TemporalOverlaps operator requires leftTimePoint and " +
-        s"rightTimePoint to be of same type.")
-    }
-
-    // leftTemporal is point, then it must be comparable with leftTimePoint
-    if (TypeCheckUtils.isTimePoint(leftTemporal.resultType)) {
-      if (leftTemporal.resultType != leftTimePoint.resultType) {
-        return ValidationFailure(s"TemporalOverlaps operator requires leftTemporal and " +
-          s"leftTimePoint to be of same type if leftTemporal is of type Time Point.")
-      }
-    } else if (!isTimeInterval(leftTemporal.resultType)) {
-      return ValidationFailure(s"TemporalOverlaps operator requires leftTemporal to be of " +
-        s"type Time Point or Time Interval.")
-    }
-
-    // rightTemporal is point, then it must be comparable with rightTimePoint
-    if (TypeCheckUtils.isTimePoint(rightTemporal.resultType)) {
-      if (rightTemporal.resultType != rightTimePoint.resultType) {
-        return ValidationFailure(s"TemporalOverlaps operator requires rightTemporal and " +
-          s"rightTimePoint to be of same type if rightTemporal is of type Time Point.")
-      }
-    } else if (!isTimeInterval(rightTemporal.resultType)) {
-      return ValidationFailure(s"TemporalOverlaps operator requires rightTemporal to be of " +
-        s"type Time Point or Time Interval.")
-    }
-    ValidationSuccess
-  }
-
-  override def toString: String = s"temporalOverlaps(${children.mkString(", ")})"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    convertOverlaps(
-      leftTimePoint.toRexNode,
-      leftTemporal.toRexNode,
-      rightTimePoint.toRexNode,
-      rightTemporal.toRexNode,
-      relBuilder.asInstanceOf[FlinkRelBuilder])
-  }
-
-  /**
-    * Standard conversion of the OVERLAPS operator.
-    * Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#convertOverlaps()]]
-    */
-  private def convertOverlaps(
-      leftP: RexNode,
-      leftT: RexNode,
-      rightP: RexNode,
-      rightT: RexNode,
-      relBuilder: FlinkRelBuilder)
-    : RexNode = {
-    // leftT = leftP + leftT if leftT is an interval
-    val convLeftT = if (isTimeInterval(leftTemporal.resultType)) {
-        relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, leftP, leftT)
-      } else {
-        leftT
-      }
-    // rightT = rightP + rightT if rightT is an interval
-    val convRightT = if (isTimeInterval(rightTemporal.resultType)) {
-        relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, rightP, rightT)
-      } else {
-        rightT
-      }
-    // leftT >= rightP
-    val leftPred = relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, convLeftT, rightP)
-    // rightT >= leftP
-    val rightPred = relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, convRightT, leftP)
-
-    // leftT >= rightP and rightT >= leftP
-    relBuilder.call(SqlStdOperatorTable.AND, leftPred, rightPred)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/windowProperties.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/windowProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/windowProperties.scala
deleted file mode 100644
index 8386c46..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/windowProperties.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
-import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.api.table.validate.{ValidationFailure, ValidationSuccess}
-
-abstract class WindowProperty(child: Expression) extends UnaryExpression {
-
-  override def toString = s"WindowProperty($child)"
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
-    throw new UnsupportedOperationException("WindowProperty cannot be transformed to RexNode.")
-
-  override private[flink] def validateInput() =
-    if (child.isInstanceOf[WindowReference]) {
-      ValidationSuccess
-    } else {
-      ValidationFailure("Child must be a window reference.")
-    }
-
-  private[flink] def toNamedWindowProperty(name: String)(implicit relBuilder: RelBuilder)
-    : NamedWindowProperty = NamedWindowProperty(name, this)
-}
-
-case class WindowStart(child: Expression) extends WindowProperty(child) {
-
-  override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP
-
-  override def toString: String = s"start($child)"
-}
-
-case class WindowEnd(child: Expression) extends WindowProperty(child) {
-
-  override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP
-
-  override def toString: String = s"end($child)"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala
deleted file mode 100644
index 2e16096..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.functions
-
-import org.apache.flink.api.common.functions.InvalidTypesException
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.table.ValidationException
-import org.apache.flink.api.table.expressions.{Expression, ScalarFunctionCall}
-
-/**
-  * Base class for a user-defined scalar function. A user-defined scalar functions maps zero, one,
-  * or multiple scalar values to a new scalar value.
-  *
-  * The behavior of a [[ScalarFunction]] can be defined by implementing a custom evaluation
-  * method. An evaluation method must be declared publicly and named "eval". Evaluation methods
-  * can also be overloaded by implementing multiple methods named "eval".
-  *
-  * User-defined functions must have a default constructor and must be instantiable during runtime.
-  *
-  * By default the result type of an evaluation method is determined by Flink's type extraction
-  * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more
-  * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type
-  * can be manually defined by overriding [[getResultType()]].
-  *
-  * Internally, the Table/SQL API code generation works with primitive values as much as possible.
-  * If a user-defined scalar function should not introduce much overhead during runtime, it is
-  * recommended to declare parameters and result types as primitive types instead of their boxed
-  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
-  */
-abstract class ScalarFunction extends UserDefinedFunction {
-
-  /**
-    * Creates a call to a [[ScalarFunction]] in Scala Table API.
-    *
-    * @param params actual parameters of function
-    * @return [[Expression]] in form of a [[ScalarFunctionCall]]
-    */
-  final def apply(params: Expression*): Expression = {
-    ScalarFunctionCall(this, params)
-  }
-
-  override def toString: String = getClass.getCanonicalName
-
-  // ----------------------------------------------------------------------------------------------
-
-  /**
-    * Returns the result type of the evaluation method with a given signature.
-    *
-    * This method needs to be overriden in case Flink's type extraction facilities are not
-    * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
-    * method. Flink's type extraction facilities can handle basic types or
-    * simple POJOs but might be wrong for more complex, custom, or composite types.
-    *
-    * @param signature signature of the method the return type needs to be determined
-    * @return [[TypeInformation]] of result type or null if Flink should determine the type
-    */
-  def getResultType(signature: Array[Class[_]]): TypeInformation[_] = null
-
-  /**
-    * Returns [[TypeInformation]] about the operands of the evaluation method with a given
-    * signature.
-    *
-    * In order to perform operand type inference in SQL (especially when NULL is used) it might be
-    * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
-    * By default Flink's type extraction facilities are used for this but might be wrong for
-    * more complex, custom, or composite types.
-    *
-    * @param signature signature of the method the operand types need to be determined
-    * @return [[TypeInformation]] of  operand types
-    */
-  def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
-    signature.map { c =>
-      try {
-        TypeExtractor.getForClass(c)
-      } catch {
-        case ite: InvalidTypesException =>
-          throw new ValidationException(
-            s"Parameter types of scalar function '${this.getClass.getCanonicalName}' cannot be " +
-            s"automatically determined. Please provide type information manually.")
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
deleted file mode 100644
index ca9aaf1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.functions
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.expressions.{Expression, TableFunctionCall}
-
-/**
-  * Base class for a user-defined table function (UDTF). A user-defined table functions works on
-  * zero, one, or multiple scalar values as input and returns multiple rows as output.
-  *
-  * The behavior of a [[TableFunction]] can be defined by implementing a custom evaluation
-  * method. An evaluation method must be declared publicly, not static and named "eval".
-  * Evaluation methods can also be overloaded by implementing multiple methods named "eval".
-  *
-  * User-defined functions must have a default constructor and must be instantiable during runtime.
-  *
-  * By default the result type of an evaluation method is determined by Flink's type extraction
-  * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more
-  * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type
-  * can be manually defined by overriding [[getResultType()]].
-  *
-  * Internally, the Table/SQL API code generation works with primitive values as much as possible.
-  * If a user-defined table function should not introduce much overhead during runtime, it is
-  * recommended to declare parameters and result types as primitive types instead of their boxed
-  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
-  *
-  * Example:
-  *
-  * {{{
-  *
-  *   public class Split extends TableFunction<String> {
-  *
-  *     // implement an "eval" method with as many parameters as you want
-  *     public void eval(String str) {
-  *       for (String s : str.split(" ")) {
-  *         collect(s);   // use collect(...) to emit an output row
-  *       }
-  *     }
-  *
-  *     // you can overload the eval method here ...
-  *   }
-  *
-  *   val tEnv: TableEnvironment = ...
-  *   val table: Table = ...    // schema: [a: String]
-  *
-  *   // for Scala users
-  *   val split = new Split()
-  *   table.join(split('c) as ('s)).select('a, 's)
-  *
-  *   // for Java users
-  *   tEnv.registerFunction("split", new Split())   // register table function first
-  *   table.join("split(a) as (s)").select("a, s")
-  *
-  *   // for SQL users
-  *   tEnv.registerFunction("split", new Split())   // register table function first
-  *   tEnv.sql("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as T(s)")
-  *
-  * }}}
-  *
-  * @tparam T The type of the output row
-  */
-abstract class TableFunction[T] extends UserDefinedFunction {
-
-  /**
-    * Creates a call to a [[TableFunction]] in Scala Table API.
-    *
-    * @param params actual parameters of function
-    * @return [[Expression]] in form of a [[TableFunctionCall]]
-    */
-  final def apply(params: Expression*)(implicit typeInfo: TypeInformation[T]): Expression = {
-    val resultType = if (getResultType == null) {
-      typeInfo
-    } else {
-      getResultType
-    }
-    TableFunctionCall(getClass.getSimpleName, this, params, resultType)
-  }
-
-  override def toString: String = getClass.getCanonicalName
-
-  // ----------------------------------------------------------------------------------------------
-
-  private val rows: util.ArrayList[T] = new util.ArrayList[T]()
-
-  /**
-    * Emit an output row.
-    *
-    * @param row the output row
-    */
-  protected def collect(row: T): Unit = {
-    // cache rows for now, maybe immediately process them further
-    rows.add(row)
-  }
-
-  /**
-    * Internal use. Get an iterator of the buffered rows.
-    */
-  def getRowsIterator = rows.iterator()
-
-  /**
-    * Internal use. Clear buffered rows.
-    */
-  def clear() = rows.clear()
-
-  // ----------------------------------------------------------------------------------------------
-
-  /**
-    * Returns the result type of the evaluation method with a given signature.
-    *
-    * This method needs to be overriden in case Flink's type extraction facilities are not
-    * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
-    * method. Flink's type extraction facilities can handle basic types or
-    * simple POJOs but might be wrong for more complex, custom, or composite types.
-    *
-    * @return [[TypeInformation]] of result type or null if Flink should determine the type
-    */
-  def getResultType: TypeInformation[T] = null
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/UserDefinedFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/UserDefinedFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/UserDefinedFunction.scala
deleted file mode 100644
index cdf6b07..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/UserDefinedFunction.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.functions
-
-/**
-  * Base class for all user-defined functions such as scalar functions, table functions,
-  * or aggregation functions.
-  *
-  * User-defined functions must have a default constructor and must be instantiable during runtime.
-  */
-trait UserDefinedFunction {
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/MathFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/MathFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/MathFunctions.scala
deleted file mode 100644
index 8a0fe65..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/MathFunctions.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.functions.utils
-
-import java.math.{BigDecimal => JBigDecimal}
-
-class MathFunctions {}
-
-object MathFunctions {
-  def power(a: Double, b: JBigDecimal): Double = {
-    Math.pow(a, b.doubleValue())
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/ScalarSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/ScalarSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/ScalarSqlFunction.scala
deleted file mode 100644
index 7953b25..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/ScalarSqlFunction.scala
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.functions.utils
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.sql._
-import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
-import org.apache.calcite.sql.`type`._
-import org.apache.calcite.sql.parser.SqlParserPos
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.functions.ScalarFunction
-import org.apache.flink.api.table.functions.utils.ScalarSqlFunction.{createOperandTypeChecker, createOperandTypeInference, createReturnTypeInference}
-import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils.{getResultType, getSignature, getSignatures, signatureToString, signaturesToString}
-import org.apache.flink.api.table.{FlinkTypeFactory, ValidationException}
-
-import scala.collection.JavaConverters._
-
-/**
-  * Calcite wrapper for user-defined scalar functions.
-  *
-  * @param name function name (used by SQL parser)
-  * @param scalarFunction scalar function to be called
-  * @param typeFactory type factory for converting Flink's between Calcite's types
-  */
-class ScalarSqlFunction(
-    name: String,
-    scalarFunction: ScalarFunction,
-    typeFactory: FlinkTypeFactory)
-  extends SqlFunction(
-    new SqlIdentifier(name, SqlParserPos.ZERO),
-    createReturnTypeInference(name, scalarFunction, typeFactory),
-    createOperandTypeInference(scalarFunction, typeFactory),
-    createOperandTypeChecker(name, scalarFunction),
-    null,
-    SqlFunctionCategory.USER_DEFINED_FUNCTION) {
-
-  def getScalarFunction = scalarFunction
-
-}
-
-object ScalarSqlFunction {
-
-  private[flink] def createReturnTypeInference(
-      name: String,
-      scalarFunction: ScalarFunction,
-      typeFactory: FlinkTypeFactory)
-    : SqlReturnTypeInference = {
-    /**
-      * Return type inference based on [[ScalarFunction]] given information.
-      */
-    new SqlReturnTypeInference {
-      override def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = {
-        val parameters = opBinding
-          .collectOperandTypes()
-          .asScala
-          .map { operandType =>
-            if (operandType.getSqlTypeName == SqlTypeName.NULL) {
-              null
-            } else {
-              FlinkTypeFactory.toTypeInfo(operandType)
-            }
-          }
-        val foundSignature = getSignature(scalarFunction, parameters)
-        if (foundSignature.isEmpty) {
-          throw new ValidationException(
-            s"Given parameters of function '$name' do not match any signature. \n" +
-              s"Actual: ${signatureToString(parameters)} \n" +
-              s"Expected: ${signaturesToString(scalarFunction)}")
-        }
-        val resultType = getResultType(scalarFunction, foundSignature.get)
-        typeFactory.createTypeFromTypeInfo(resultType)
-      }
-    }
-  }
-
-  private[flink] def createOperandTypeInference(
-      scalarFunction: ScalarFunction,
-      typeFactory: FlinkTypeFactory)
-    : SqlOperandTypeInference = {
-    /**
-      * Operand type inference based on [[ScalarFunction]] given information.
-      */
-    new SqlOperandTypeInference {
-      override def inferOperandTypes(
-          callBinding: SqlCallBinding,
-          returnType: RelDataType,
-          operandTypes: Array[RelDataType]): Unit = {
-
-        val operandTypeInfo = getOperandTypeInfo(callBinding)
-
-        val foundSignature = getSignature(scalarFunction, operandTypeInfo)
-          .getOrElse(throw new ValidationException(s"Operand types of could not be inferred."))
-
-        val inferredTypes = scalarFunction
-          .getParameterTypes(foundSignature)
-          .map(typeFactory.createTypeFromTypeInfo)
-
-        inferredTypes.zipWithIndex.foreach {
-          case (inferredType, i) =>
-            operandTypes(i) = inferredType
-        }
-      }
-    }
-  }
-
-  private[flink] def createOperandTypeChecker(
-      name: String,
-      scalarFunction: ScalarFunction)
-    : SqlOperandTypeChecker = {
-
-    val signatures = getSignatures(scalarFunction)
-
-    /**
-      * Operand type checker based on [[ScalarFunction]] given information.
-      */
-    new SqlOperandTypeChecker {
-      override def getAllowedSignatures(op: SqlOperator, opName: String): String = {
-        s"$opName[${signaturesToString(scalarFunction)}]"
-      }
-
-      override def getOperandCountRange: SqlOperandCountRange = {
-        val signatureLengths = signatures.map(_.length)
-        SqlOperandCountRanges.between(signatureLengths.min, signatureLengths.max)
-      }
-
-      override def checkOperandTypes(
-          callBinding: SqlCallBinding,
-          throwOnFailure: Boolean)
-        : Boolean = {
-        val operandTypeInfo = getOperandTypeInfo(callBinding)
-
-        val foundSignature = getSignature(scalarFunction, operandTypeInfo)
-
-        if (foundSignature.isEmpty) {
-          if (throwOnFailure) {
-            throw new ValidationException(
-              s"Given parameters of function '$name' do not match any signature. \n" +
-                s"Actual: ${signatureToString(operandTypeInfo)} \n" +
-                s"Expected: ${signaturesToString(scalarFunction)}")
-          } else {
-            false
-          }
-        } else {
-          true
-        }
-      }
-
-      override def isOptional(i: Int): Boolean = false
-
-      override def getConsistency: Consistency = Consistency.NONE
-
-    }
-  }
-
-  private[flink] def getOperandTypeInfo(callBinding: SqlCallBinding): Seq[TypeInformation[_]] = {
-    val operandTypes = for (i <- 0 until callBinding.getOperandCount)
-      yield callBinding.getOperandType(i)
-    operandTypes.map { operandType =>
-      if (operandType.getSqlTypeName == SqlTypeName.NULL) {
-        null
-      } else {
-        FlinkTypeFactory.toTypeInfo(operandType)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
deleted file mode 100644
index 738238d..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.functions.utils
-
-import com.google.common.base.Predicate
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.sql._
-import org.apache.calcite.sql.`type`._
-import org.apache.calcite.sql.parser.SqlParserPos
-import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction
-import org.apache.calcite.util.Util
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.functions.TableFunction
-import org.apache.flink.api.table.FlinkTypeFactory
-import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl
-
-import scala.collection.JavaConverters._
-import java.util
-
-/**
-  * Calcite wrapper for user-defined table functions.
-  */
-class TableSqlFunction(
-    name: String,
-    udtf: TableFunction[_],
-    rowTypeInfo: TypeInformation[_],
-    returnTypeInference: SqlReturnTypeInference,
-    operandTypeInference: SqlOperandTypeInference,
-    operandTypeChecker: SqlOperandTypeChecker,
-    paramTypes: util.List[RelDataType],
-    functionImpl: FlinkTableFunctionImpl[_])
-  extends SqlUserDefinedTableFunction(
-    new SqlIdentifier(name, SqlParserPos.ZERO),
-    returnTypeInference,
-    operandTypeInference,
-    operandTypeChecker,
-    paramTypes,
-    functionImpl) {
-
-  /**
-    * Get the user-defined table function.
-    */
-  def getTableFunction = udtf
-
-  /**
-    * Get the type information of the table returned by the table function.
-    */
-  def getRowTypeInfo = rowTypeInfo
-
-  /**
-    * Get additional mapping information if the returned table type is a POJO
-    * (POJO types have no deterministic field order).
-    */
-  def getPojoFieldMapping = functionImpl.fieldIndexes
-
-}
-
-object TableSqlFunction {
-
-  /**
-    * Util function to create a [[TableSqlFunction]].
-    *
-    * @param name function name (used by SQL parser)
-    * @param udtf user-defined table function to be called
-    * @param rowTypeInfo the row type information generated by the table function
-    * @param typeFactory type factory for converting Flink's between Calcite's types
-    * @param functionImpl Calcite table function schema
-    * @return [[TableSqlFunction]]
-    */
-  def apply(
-    name: String,
-    udtf: TableFunction[_],
-    rowTypeInfo: TypeInformation[_],
-    typeFactory: FlinkTypeFactory,
-    functionImpl: FlinkTableFunctionImpl[_]): TableSqlFunction = {
-
-    val argTypes: util.List[RelDataType] = new util.ArrayList[RelDataType]
-    val typeFamilies: util.List[SqlTypeFamily] = new util.ArrayList[SqlTypeFamily]
-    // derives operands' data types and type families
-    functionImpl.getParameters.asScala.foreach{ o =>
-      val relType: RelDataType = o.getType(typeFactory)
-      argTypes.add(relType)
-      typeFamilies.add(Util.first(relType.getSqlTypeName.getFamily, SqlTypeFamily.ANY))
-    }
-    // derives whether the 'input'th parameter of a method is optional.
-    val optional: Predicate[Integer] = new Predicate[Integer]() {
-      def apply(input: Integer): Boolean = {
-        functionImpl.getParameters.get(input).isOptional
-      }
-    }
-    // create type check for the operands
-    val typeChecker: FamilyOperandTypeChecker = OperandTypes.family(typeFamilies, optional)
-
-    new TableSqlFunction(
-      name,
-      udtf,
-      rowTypeInfo,
-      ReturnTypes.CURSOR,
-      InferTypes.explicit(argTypes),
-      typeChecker,
-      argTypes,
-      functionImpl)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
deleted file mode 100644
index 4899691..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.table.functions.utils
-
-import java.lang.reflect.{Method, Modifier}
-import java.sql.{Date, Time, Timestamp}
-
-import com.google.common.primitives.Primitives
-import org.apache.calcite.sql.SqlFunction
-import org.apache.flink.api.common.functions.InvalidTypesException
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.table.{FlinkTypeFactory, TableException, ValidationException}
-import org.apache.flink.api.table.functions.{ScalarFunction, TableFunction, UserDefinedFunction}
-import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl
-import org.apache.flink.util.InstantiationUtil
-
-object UserDefinedFunctionUtils {
-
-  /**
-    * Instantiates a user-defined function.
-    */
-  def instantiate[T <: UserDefinedFunction](clazz: Class[T]): T = {
-    val constructor = clazz.getDeclaredConstructor()
-    constructor.setAccessible(true)
-    constructor.newInstance()
-  }
-
-  /**
-    * Checks if a user-defined function can be easily instantiated.
-    */
-  def checkForInstantiation(clazz: Class[_]): Unit = {
-    if (!InstantiationUtil.isPublic(clazz)) {
-      throw ValidationException("Function class is not public.")
-    }
-    else if (!InstantiationUtil.isProperClass(clazz)) {
-      throw ValidationException("Function class is no proper class, it is either abstract," +
-        " an interface, or a primitive type.")
-    }
-    else if (InstantiationUtil.isNonStaticInnerClass(clazz)) {
-      throw ValidationException("The class is an inner class, but not statically accessible.")
-    }
-
-    // check for default constructor (can be private)
-    clazz
-      .getDeclaredConstructors
-      .find(_.getParameterTypes.isEmpty)
-      .getOrElse(throw ValidationException("Function class needs a default constructor."))
-  }
-
-  /**
-    * Check whether this is a Scala object. It is forbidden to use [[TableFunction]] implemented
-    * by a Scala object, since concurrent risks.
-    */
-  def checkNotSingleton(clazz: Class[_]): Unit = {
-    // TODO it is not a good way to check singleton. Maybe improve it further.
-    if (clazz.getFields.map(_.getName) contains "MODULE$") {
-      throw new ValidationException(
-        s"TableFunction implemented by class ${clazz.getCanonicalName} " +
-          s"is a Scala object, it is forbidden since concurrent risks.")
-    }
-  }
-
-  // ----------------------------------------------------------------------------------------------
-  // Utilities for eval methods
-  // ----------------------------------------------------------------------------------------------
-
-  /**
-    * Returns signatures matching the given signature of [[TypeInformation]].
-    * Elements of the signature can be null (act as a wildcard).
-    */
-  def getSignature(
-      function: UserDefinedFunction,
-      signature: Seq[TypeInformation[_]])
-    : Option[Array[Class[_]]] = {
-    // We compare the raw Java classes not the TypeInformation.
-    // TypeInformation does not matter during runtime (e.g. within a MapFunction).
-    val actualSignature = typeInfoToClass(signature)
-    val signatures = getSignatures(function)
-
-    signatures
-      // go over all signatures and find one matching actual signature
-      .find { curSig =>
-      // match parameters of signature to actual parameters
-      actualSignature.length == curSig.length &&
-        curSig.zipWithIndex.forall { case (clazz, i) =>
-          parameterTypeEquals(actualSignature(i), clazz)
-        }
-    }
-  }
-
-  /**
-    * Returns eval method matching the given signature of [[TypeInformation]].
-    */
-  def getEvalMethod(
-      function: UserDefinedFunction,
-      signature: Seq[TypeInformation[_]])
-    : Option[Method] = {
-    // We compare the raw Java classes not the TypeInformation.
-    // TypeInformation does not matter during runtime (e.g. within a MapFunction).
-    val actualSignature = typeInfoToClass(signature)
-    val evalMethods = checkAndExtractEvalMethods(function)
-
-    evalMethods
-      // go over all eval methods and find one matching
-      .find { cur =>
-      val signatures = cur.getParameterTypes
-      // match parameters of signature to actual parameters
-      actualSignature.length == signatures.length &&
-        signatures.zipWithIndex.forall { case (clazz, i) =>
-          parameterTypeEquals(actualSignature(i), clazz)
-        }
-    }
-  }
-
-  /**
-    * Extracts "eval" methods and throws a [[ValidationException]] if no implementation
-    * can be found.
-    */
-  def checkAndExtractEvalMethods(function: UserDefinedFunction): Array[Method] = {
-    val methods = function
-      .getClass
-      .getDeclaredMethods
-      .filter { m =>
-        val modifiers = m.getModifiers
-        m.getName == "eval" &&
-          Modifier.isPublic(modifiers) &&
-          !Modifier.isAbstract(modifiers) &&
-          !(function.isInstanceOf[TableFunction[_]] && Modifier.isStatic(modifiers))
-      }
-
-    if (methods.isEmpty) {
-      throw new ValidationException(
-        s"Function class '${function.getClass.getCanonicalName}' does not implement at least " +
-          s"one method named 'eval' which is public, not abstract and " +
-          s"(in case of table functions) not static.")
-    } else {
-      methods
-    }
-  }
-
-  def getSignatures(function: UserDefinedFunction): Array[Array[Class[_]]] = {
-    checkAndExtractEvalMethods(function).map(_.getParameterTypes)
-  }
-
-  // ----------------------------------------------------------------------------------------------
-  // Utilities for SQL functions
-  // ----------------------------------------------------------------------------------------------
-
-  /**
-    * Create [[SqlFunction]] for a [[ScalarFunction]]
-    * @param name function name
-    * @param function scalar function
-    * @param typeFactory type factory
-    * @return the ScalarSqlFunction
-    */
-  def createScalarSqlFunction(
-      name: String,
-      function: ScalarFunction,
-      typeFactory: FlinkTypeFactory)
-    : SqlFunction = {
-    new ScalarSqlFunction(name, function, typeFactory)
-  }
-
-  /**
-    * Create [[SqlFunction]]s for a [[TableFunction]]'s every eval method
-    * @param name function name
-    * @param tableFunction table function
-    * @param resultType the type information of returned table
-    * @param typeFactory type factory
-    * @return the TableSqlFunction
-    */
-  def createTableSqlFunctions(
-      name: String,
-      tableFunction: TableFunction[_],
-      resultType: TypeInformation[_],
-      typeFactory: FlinkTypeFactory)
-    : Seq[SqlFunction] = {
-    val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType)
-    val evalMethods = checkAndExtractEvalMethods(tableFunction)
-
-    evalMethods.map { method =>
-      val function = new FlinkTableFunctionImpl(resultType, fieldIndexes, fieldNames, method)
-      TableSqlFunction(name, tableFunction, resultType, typeFactory, function)
-    }
-  }
-
-  // ----------------------------------------------------------------------------------------------
-  // Utilities for scalar functions
-  // ----------------------------------------------------------------------------------------------
-
-  /**
-    * Internal method of [[ScalarFunction#getResultType()]] that does some pre-checking and uses
-    * [[TypeExtractor]] as default return type inference.
-    */
-  def getResultType(
-      function: ScalarFunction,
-      signature: Array[Class[_]])
-    : TypeInformation[_] = {
-    // find method for signature
-    val evalMethod = checkAndExtractEvalMethods(function)
-      .find(m => signature.sameElements(m.getParameterTypes))
-      .getOrElse(throw new ValidationException("Given signature is invalid."))
-
-    val userDefinedTypeInfo = function.getResultType(signature)
-    if (userDefinedTypeInfo != null) {
-      userDefinedTypeInfo
-    } else {
-      try {
-        TypeExtractor.getForClass(evalMethod.getReturnType)
-      } catch {
-        case ite: InvalidTypesException =>
-          throw new ValidationException(
-            s"Return type of scalar function '${function.getClass.getCanonicalName}' cannot be " +
-              s"automatically determined. Please provide type information manually.")
-      }
-    }
-  }
-
-  /**
-    * Returns the return type of the evaluation method matching the given signature.
-    */
-  def getResultTypeClass(
-      function: ScalarFunction,
-      signature: Array[Class[_]])
-    : Class[_] = {
-    // find method for signature
-    val evalMethod = checkAndExtractEvalMethods(function)
-      .find(m => signature.sameElements(m.getParameterTypes))
-      .getOrElse(throw new IllegalArgumentException("Given signature is invalid."))
-    evalMethod.getReturnType
-  }
-
-  // ----------------------------------------------------------------------------------------------
-  // Miscellaneous
-  // ----------------------------------------------------------------------------------------------
-
-  /**
-    * Returns field names and field positions for a given [[TypeInformation]].
-    *
-    * Field names are automatically extracted for
-    * [[org.apache.flink.api.common.typeutils.CompositeType]].
-    *
-    * @param inputType The TypeInformation to extract the field names and positions from.
-    * @return A tuple of two arrays holding the field names and corresponding field positions.
-    */
-  def getFieldInfo(inputType: TypeInformation[_])
-    : (Array[String], Array[Int], Array[TypeInformation[_]]) = {
-
-    val fieldNames: Array[String] = inputType match {
-      case t: CompositeType[_] => t.getFieldNames
-      case a: AtomicType[_] => Array("f0")
-      case tpe =>
-        throw new TableException(s"Currently only CompositeType and AtomicType are supported. " +
-          s"Type $tpe lacks explicit field naming")
-    }
-    val fieldIndexes = fieldNames.indices.toArray
-    val fieldTypes: Array[TypeInformation[_]] = fieldNames.map { i =>
-      inputType match {
-        case t: CompositeType[_] => t.getTypeAt(i).asInstanceOf[TypeInformation[_]]
-        case a: AtomicType[_] => a.asInstanceOf[TypeInformation[_]]
-        case tpe =>
-          throw new TableException(s"Currently only CompositeType and AtomicType are supported.")
-      }
-    }
-    (fieldNames, fieldIndexes, fieldTypes)
-  }
-
-  /**
-    * Prints one signature consisting of classes.
-    */
-  def signatureToString(signature: Array[Class[_]]): String =
-  signature.map { clazz =>
-    if (clazz == null) {
-      "null"
-    } else {
-      clazz.getCanonicalName
-    }
-  }.mkString("(", ", ", ")")
-
-  /**
-    * Prints one signature consisting of TypeInformation.
-    */
-  def signatureToString(signature: Seq[TypeInformation[_]]): String = {
-    signatureToString(typeInfoToClass(signature))
-  }
-
-  /**
-    * Prints all eval methods signatures of a class.
-    */
-  def signaturesToString(function: UserDefinedFunction): String = {
-    getSignatures(function).map(signatureToString).mkString(", ")
-  }
-
-  /**
-    * Extracts type classes of [[TypeInformation]] in a null-aware way.
-    */
-  private def typeInfoToClass(typeInfos: Seq[TypeInformation[_]]): Array[Class[_]] =
-  typeInfos.map { typeInfo =>
-    if (typeInfo == null) {
-      null
-    } else {
-      typeInfo.getTypeClass
-    }
-  }.toArray
-
-
-  /**
-    * Compares parameter candidate classes with expected classes. If true, the parameters match.
-    * Candidate can be null (acts as a wildcard).
-    */
-  private def parameterTypeEquals(candidate: Class[_], expected: Class[_]): Boolean =
-  candidate == null ||
-    candidate == expected ||
-    expected.isPrimitive && Primitives.wrap(expected) == candidate ||
-    candidate == classOf[Date] && expected == classOf[Int] ||
-    candidate == classOf[Time] && expected == classOf[Int] ||
-    candidate == classOf[Timestamp] && expected == classOf[Long]
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
deleted file mode 100644
index bdcb22c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api
-
-/**
- * == Table API ==
- *
- * This package contains the generic part of the Table API. It can be used with Flink Streaming
- * and Flink Batch. From Scala as well as from Java.
- *
- * When using the Table API, as user creates a [[org.apache.flink.api.table.Table]] from
- * a DataSet or DataStream. On this relational operations can be performed. A table can also
- * be converted back to a DataSet or DataStream.
- *
- * Packages [[org.apache.flink.api.scala.table]] and [[org.apache.flink.api.java.table]] contain
- * the language specific part of the API. Refer to these packages for documentation on how
- * the Table API can be used in Java and Scala.
- */
-package object table