You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:07 UTC

[38/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala
deleted file mode 100644
index d41e9a7..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen.calls
-
-import java.lang.reflect.Method
-
-import org.apache.calcite.avatica.util.TimeUnitRange
-import org.apache.calcite.avatica.util.TimeUnitRange.{MONTH, YEAR}
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{BIG_DEC_TYPE_INFO, DOUBLE_TYPE_INFO, FLOAT_TYPE_INFO}
-import org.apache.flink.api.table.codegen.CodeGenUtils.{getEnum, primitiveTypeTermForTypeInfo, qualifyMethod}
-import org.apache.flink.api.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
-import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
-
-/**
-  * Generates floor/ceil function calls.
-  */
-class FloorCeilCallGen(
-    arithmeticMethod: Method,
-    temporalMethod: Option[Method] = None)
-  extends MultiTypeMethodCallGen(arithmeticMethod) {
-
-  override def generate(
-      codeGenerator: CodeGenerator,
-      operands: Seq[GeneratedExpression])
-    : GeneratedExpression = operands.size match {
-    // arithmetic
-    case 1 =>
-      operands.head.resultType match {
-        case FLOAT_TYPE_INFO | DOUBLE_TYPE_INFO | BIG_DEC_TYPE_INFO =>
-          super.generate(codeGenerator, operands)
-        case _ =>
-          operands.head // no floor/ceil necessary
-      }
-
-    // temporal
-    case 2 =>
-      val operand = operands.head
-      val unit = getEnum(operands(1)).asInstanceOf[TimeUnitRange]
-      val internalType = primitiveTypeTermForTypeInfo(operand.resultType)
-
-      generateCallIfArgsNotNull(codeGenerator.nullCheck, operand.resultType, operands) {
-        (terms) =>
-          unit match {
-            case YEAR | MONTH =>
-              s"""
-                |($internalType) ${qualifyMethod(temporalMethod.get)}(${terms(1)}, ${terms.head})
-                |""".stripMargin
-            case _ =>
-              s"""
-                |${qualifyMethod(arithmeticMethod)}(
-                |  ($internalType) ${terms.head},
-                |  ($internalType) ${unit.startUnit.multiplier.intValue()})
-                |""".stripMargin
-          }
-      }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FunctionGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FunctionGenerator.scala
deleted file mode 100644
index 9b144ba..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FunctionGenerator.scala
+++ /dev/null
@@ -1,369 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen.calls
-
-import java.lang.reflect.Method
-
-import org.apache.calcite.avatica.util.TimeUnitRange
-import org.apache.calcite.sql.SqlOperator
-import org.apache.calcite.sql.fun.SqlStdOperatorTable._
-import org.apache.calcite.sql.fun.SqlTrimFunction
-import org.apache.calcite.util.BuiltInMethod
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.GenericTypeInfo
-import org.apache.flink.api.table.functions.utils.{TableSqlFunction, ScalarSqlFunction}
-
-import scala.collection.mutable
-
-/**
-  * Global hub for user-defined and built-in advanced SQL functions.
-  */
-object FunctionGenerator {
-
-  private val sqlFunctions: mutable.Map[(SqlOperator, Seq[TypeInformation[_]]), CallGenerator] =
-    mutable.Map()
-
-  // ----------------------------------------------------------------------------------------------
-  // String functions
-  // ----------------------------------------------------------------------------------------------
-
-  addSqlFunctionMethod(
-    SUBSTRING,
-    Seq(STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO),
-    STRING_TYPE_INFO,
-    BuiltInMethod.SUBSTRING.method)
-
-  addSqlFunctionMethod(
-    SUBSTRING,
-    Seq(STRING_TYPE_INFO, INT_TYPE_INFO),
-    STRING_TYPE_INFO,
-    BuiltInMethod.SUBSTRING.method)
-
-  addSqlFunction(
-    TRIM,
-    Seq(new GenericTypeInfo(classOf[SqlTrimFunction.Flag]), STRING_TYPE_INFO, STRING_TYPE_INFO),
-    new TrimCallGen())
-
-  addSqlFunctionMethod(
-    CHAR_LENGTH,
-    Seq(STRING_TYPE_INFO),
-    INT_TYPE_INFO,
-    BuiltInMethod.CHAR_LENGTH.method)
-
-  addSqlFunctionMethod(
-    CHARACTER_LENGTH,
-    Seq(STRING_TYPE_INFO),
-    INT_TYPE_INFO,
-    BuiltInMethod.CHAR_LENGTH.method)
-
-  addSqlFunctionMethod(
-    UPPER,
-    Seq(STRING_TYPE_INFO),
-    STRING_TYPE_INFO,
-    BuiltInMethod.UPPER.method)
-
-  addSqlFunctionMethod(
-    LOWER,
-    Seq(STRING_TYPE_INFO),
-    STRING_TYPE_INFO,
-    BuiltInMethod.LOWER.method)
-
-  addSqlFunctionMethod(
-    INITCAP,
-    Seq(STRING_TYPE_INFO),
-    STRING_TYPE_INFO,
-    BuiltInMethod.INITCAP.method)
-
-  addSqlFunctionMethod(
-    LIKE,
-    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO),
-    BOOLEAN_TYPE_INFO,
-    BuiltInMethod.LIKE.method)
-
-  addSqlFunctionMethod(
-    LIKE,
-    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO),
-    BOOLEAN_TYPE_INFO,
-    BuiltInMethods.LIKE_WITH_ESCAPE)
-
-  addSqlFunctionNotMethod(
-    NOT_LIKE,
-    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO),
-    BuiltInMethod.LIKE.method)
-
-  addSqlFunctionMethod(
-    SIMILAR_TO,
-    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO),
-    BOOLEAN_TYPE_INFO,
-    BuiltInMethod.SIMILAR.method)
-
-  addSqlFunctionMethod(
-    SIMILAR_TO,
-    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO),
-    BOOLEAN_TYPE_INFO,
-    BuiltInMethods.SIMILAR_WITH_ESCAPE)
-
-  addSqlFunctionNotMethod(
-    NOT_SIMILAR_TO,
-    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO),
-    BuiltInMethod.SIMILAR.method)
-
-  addSqlFunctionMethod(
-    POSITION,
-    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO),
-    INT_TYPE_INFO,
-    BuiltInMethod.POSITION.method)
-
-  addSqlFunctionMethod(
-    OVERLAY,
-    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, INT_TYPE_INFO),
-    STRING_TYPE_INFO,
-    BuiltInMethod.OVERLAY.method)
-
-  addSqlFunctionMethod(
-    OVERLAY,
-    Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO),
-    STRING_TYPE_INFO,
-    BuiltInMethod.OVERLAY.method)
-
-  // ----------------------------------------------------------------------------------------------
-  // Arithmetic functions
-  // ----------------------------------------------------------------------------------------------
-
-  addSqlFunctionMethod(
-    LOG10,
-    Seq(DOUBLE_TYPE_INFO),
-    DOUBLE_TYPE_INFO,
-    BuiltInMethods.LOG10)
-
-  addSqlFunctionMethod(
-    LN,
-    Seq(DOUBLE_TYPE_INFO),
-    DOUBLE_TYPE_INFO,
-    BuiltInMethods.LN)
-
-  addSqlFunctionMethod(
-    EXP,
-    Seq(DOUBLE_TYPE_INFO),
-    DOUBLE_TYPE_INFO,
-    BuiltInMethods.EXP)
-
-  addSqlFunctionMethod(
-    POWER,
-    Seq(DOUBLE_TYPE_INFO, DOUBLE_TYPE_INFO),
-    DOUBLE_TYPE_INFO,
-    BuiltInMethods.POWER)
-
-  addSqlFunctionMethod(
-    POWER,
-    Seq(DOUBLE_TYPE_INFO, BIG_DEC_TYPE_INFO),
-    DOUBLE_TYPE_INFO,
-    BuiltInMethods.POWER_DEC)
-
-  addSqlFunction(
-    ABS,
-    Seq(DOUBLE_TYPE_INFO),
-    new MultiTypeMethodCallGen(BuiltInMethods.ABS))
-
-  addSqlFunction(
-    ABS,
-    Seq(BIG_DEC_TYPE_INFO),
-    new MultiTypeMethodCallGen(BuiltInMethods.ABS_DEC))
-
-  addSqlFunction(
-    FLOOR,
-    Seq(DOUBLE_TYPE_INFO),
-    new FloorCeilCallGen(BuiltInMethod.FLOOR.method))
-
-  addSqlFunction(
-    FLOOR,
-    Seq(BIG_DEC_TYPE_INFO),
-    new FloorCeilCallGen(BuiltInMethod.FLOOR.method))
-
-  addSqlFunction(
-    CEIL,
-    Seq(DOUBLE_TYPE_INFO),
-    new FloorCeilCallGen(BuiltInMethod.CEIL.method))
-
-  addSqlFunction(
-    CEIL,
-    Seq(BIG_DEC_TYPE_INFO),
-    new FloorCeilCallGen(BuiltInMethod.CEIL.method))
-
-  // ----------------------------------------------------------------------------------------------
-  // Temporal functions
-  // ----------------------------------------------------------------------------------------------
-
-  addSqlFunctionMethod(
-    EXTRACT_DATE,
-    Seq(new GenericTypeInfo(classOf[TimeUnitRange]), LONG_TYPE_INFO),
-    LONG_TYPE_INFO,
-    BuiltInMethod.UNIX_DATE_EXTRACT.method)
-
-  addSqlFunctionMethod(
-    EXTRACT_DATE,
-    Seq(new GenericTypeInfo(classOf[TimeUnitRange]), SqlTimeTypeInfo.DATE),
-    LONG_TYPE_INFO,
-    BuiltInMethod.UNIX_DATE_EXTRACT.method)
-
-  addSqlFunction(
-    FLOOR,
-    Seq(SqlTimeTypeInfo.DATE, new GenericTypeInfo(classOf[TimeUnitRange])),
-    new FloorCeilCallGen(
-      BuiltInMethod.FLOOR.method,
-      Some(BuiltInMethod.UNIX_DATE_FLOOR.method)))
-
-  addSqlFunction(
-    FLOOR,
-    Seq(SqlTimeTypeInfo.TIME, new GenericTypeInfo(classOf[TimeUnitRange])),
-    new FloorCeilCallGen(
-      BuiltInMethod.FLOOR.method,
-      Some(BuiltInMethod.UNIX_DATE_FLOOR.method)))
-
-  addSqlFunction(
-    FLOOR,
-    Seq(SqlTimeTypeInfo.TIMESTAMP, new GenericTypeInfo(classOf[TimeUnitRange])),
-    new FloorCeilCallGen(
-      BuiltInMethod.FLOOR.method,
-      Some(BuiltInMethod.UNIX_TIMESTAMP_FLOOR.method)))
-
-  addSqlFunction(
-    CEIL,
-    Seq(SqlTimeTypeInfo.DATE, new GenericTypeInfo(classOf[TimeUnitRange])),
-    new FloorCeilCallGen(
-      BuiltInMethod.CEIL.method,
-      Some(BuiltInMethod.UNIX_DATE_CEIL.method)))
-
-  addSqlFunction(
-    CEIL,
-    Seq(SqlTimeTypeInfo.TIME, new GenericTypeInfo(classOf[TimeUnitRange])),
-    new FloorCeilCallGen(
-      BuiltInMethod.CEIL.method,
-      Some(BuiltInMethod.UNIX_DATE_CEIL.method)))
-
-  addSqlFunction(
-    CEIL,
-    Seq(SqlTimeTypeInfo.TIMESTAMP, new GenericTypeInfo(classOf[TimeUnitRange])),
-    new FloorCeilCallGen(
-      BuiltInMethod.CEIL.method,
-      Some(BuiltInMethod.UNIX_TIMESTAMP_CEIL.method)))
-
-  addSqlFunction(
-    CURRENT_DATE,
-    Seq(),
-    new CurrentTimePointCallGen(SqlTimeTypeInfo.DATE, local = false))
-
-  addSqlFunction(
-    CURRENT_TIME,
-    Seq(),
-    new CurrentTimePointCallGen(SqlTimeTypeInfo.TIME, local = false))
-
-  addSqlFunction(
-    CURRENT_TIMESTAMP,
-    Seq(),
-    new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = false))
-
-  addSqlFunction(
-    LOCALTIME,
-    Seq(),
-    new CurrentTimePointCallGen(SqlTimeTypeInfo.TIME, local = true))
-
-  addSqlFunction(
-    LOCALTIMESTAMP,
-    Seq(),
-    new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
-
-  // ----------------------------------------------------------------------------------------------
-
-  /**
-    * Returns a [[CallGenerator]] that generates all required code for calling the given
-    * [[SqlOperator]].
-    *
-    * @param sqlOperator SQL operator (might be overloaded)
-    * @param operandTypes actual operand types
-    * @param resultType expected return type
-    * @return [[CallGenerator]]
-    */
-  def getCallGenerator(
-      sqlOperator: SqlOperator,
-      operandTypes: Seq[TypeInformation[_]],
-      resultType: TypeInformation[_])
-    : Option[CallGenerator] = sqlOperator match {
-
-    // user-defined scalar function
-    case ssf: ScalarSqlFunction =>
-      Some(
-        new ScalarFunctionCallGen(
-          ssf.getScalarFunction,
-          operandTypes,
-          resultType
-        )
-      )
-
-    // user-defined table function
-    case tsf: TableSqlFunction =>
-      Some(
-        new TableFunctionCallGen(
-          tsf.getTableFunction,
-          operandTypes,
-          resultType
-        )
-      )
-
-    // built-in scalar function
-    case _ =>
-      sqlFunctions.get((sqlOperator, operandTypes))
-        .orElse(sqlFunctions.find(entry => entry._1._1 == sqlOperator
-          && entry._1._2.length == operandTypes.length
-          && entry._1._2.zip(operandTypes).forall {
-          case (x: BasicTypeInfo[_], y: BasicTypeInfo[_]) => y.shouldAutocastTo(x) || x == y
-          case _ => false
-        }).map(_._2))
-  }
-
-  // ----------------------------------------------------------------------------------------------
-
-  private def addSqlFunctionMethod(
-      sqlOperator: SqlOperator,
-      operandTypes: Seq[TypeInformation[_]],
-      returnType: TypeInformation[_],
-      method: Method)
-    : Unit = {
-    sqlFunctions((sqlOperator, operandTypes)) = new MethodCallGen(returnType, method)
-  }
-
-  private def addSqlFunctionNotMethod(
-      sqlOperator: SqlOperator,
-      operandTypes: Seq[TypeInformation[_]],
-      method: Method)
-    : Unit = {
-    sqlFunctions((sqlOperator, operandTypes)) =
-      new NotCallGenerator(new MethodCallGen(BOOLEAN_TYPE_INFO, method))
-  }
-
-  private def addSqlFunction(
-      sqlOperator: SqlOperator,
-      operandTypes: Seq[TypeInformation[_]],
-      callGenerator: CallGenerator)
-    : Unit = {
-    sqlFunctions((sqlOperator, operandTypes)) = callGenerator
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGen.scala
deleted file mode 100644
index 376f54a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGen.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen.calls
-
-import java.lang.reflect.Method
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.codegen.CodeGenUtils.qualifyMethod
-import org.apache.flink.api.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
-import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
-
-/**
-  * Generates a function call by using a [[java.lang.reflect.Method]].
-  */
-class MethodCallGen(returnType: TypeInformation[_], method: Method) extends CallGenerator {
-
-  override def generate(
-      codeGenerator: CodeGenerator,
-      operands: Seq[GeneratedExpression])
-    : GeneratedExpression = {
-    generateCallIfArgsNotNull(codeGenerator.nullCheck, returnType, operands) {
-      (terms) =>
-        s"""
-          |${qualifyMethod(method)}(${terms.mkString(", ")})
-          |""".stripMargin
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MultiTypeMethodCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MultiTypeMethodCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MultiTypeMethodCallGen.scala
deleted file mode 100644
index e9e8f18..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MultiTypeMethodCallGen.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen.calls
-
-import java.lang.reflect.Method
-
-import org.apache.flink.api.table.codegen.calls.CallGenerator._
-import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
-
-/**
-  * Generates a function call that calls a method which returns the same type that it
-  * takes as first argument.
-  */
-class MultiTypeMethodCallGen(method: Method) extends CallGenerator {
-
-  override def generate(
-      codeGenerator: CodeGenerator,
-      operands: Seq[GeneratedExpression])
-    : GeneratedExpression = {
-    generateCallIfArgsNotNull(codeGenerator.nullCheck, operands.head.resultType, operands) {
-      (operandResultTerms) =>
-        s"""
-          |${method.getDeclaringClass.getCanonicalName}.
-          |  ${method.getName}(${operandResultTerms.mkString(", ")})
-         """.stripMargin
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/NotCallGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/NotCallGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/NotCallGenerator.scala
deleted file mode 100644
index 5cd358a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/NotCallGenerator.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen.calls
-
-import org.apache.flink.api.table.codegen.calls.ScalarOperators.generateNot
-import org.apache.flink.api.table.codegen.{GeneratedExpression, CodeGenerator}
-
-/**
-  * Inverts the boolean value of a CallGenerator result.
-  */
-class NotCallGenerator(callGenerator: CallGenerator) extends CallGenerator {
-
-  override def generate(
-      codeGenerator: CodeGenerator,
-      operands: Seq[GeneratedExpression])
-    : GeneratedExpression = {
-    val expr = callGenerator.generate(codeGenerator, operands)
-    generateNot(codeGenerator.nullCheck, expr)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctionCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctionCallGen.scala
deleted file mode 100644
index b6ef8ad..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctionCallGen.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen.calls
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.codegen.CodeGenUtils._
-import org.apache.flink.api.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
-import org.apache.flink.api.table.functions.ScalarFunction
-import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._
-
-/**
-  * Generates a call to user-defined [[ScalarFunction]].
-  *
-  * @param scalarFunction user-defined [[ScalarFunction]] that might be overloaded
-  * @param signature actual signature with which the function is called
-  * @param returnType actual return type required by the surrounding
-  */
-class ScalarFunctionCallGen(
-    scalarFunction: ScalarFunction,
-    signature: Seq[TypeInformation[_]],
-    returnType: TypeInformation[_])
-  extends CallGenerator {
-
-  override def generate(
-      codeGenerator: CodeGenerator,
-      operands: Seq[GeneratedExpression])
-    : GeneratedExpression = {
-    // determine function signature and result class
-    val matchingSignature = getSignature(scalarFunction, signature)
-      .getOrElse(throw new CodeGenException("No matching signature found."))
-    val resultClass = getResultTypeClass(scalarFunction, matchingSignature)
-
-    // convert parameters for function (output boxing)
-    val parameters = matchingSignature
-        .zip(operands)
-        .map { case (paramClass, operandExpr) =>
-          if (paramClass.isPrimitive) {
-            operandExpr
-          } else {
-            val boxedTypeTerm = boxedTypeTermForTypeInfo(operandExpr.resultType)
-            val boxedExpr = codeGenerator.generateOutputFieldBoxing(operandExpr)
-            val exprOrNull: String = if (codeGenerator.nullCheck) {
-              s"${boxedExpr.nullTerm} ? null : ($boxedTypeTerm) ${boxedExpr.resultTerm}"
-            } else {
-              boxedExpr.resultTerm
-            }
-            boxedExpr.copy(resultTerm = exprOrNull)
-          }
-        }
-
-    // generate function call
-    val functionReference = codeGenerator.addReusableFunction(scalarFunction)
-    val resultTypeTerm = if (resultClass.isPrimitive) {
-      primitiveTypeTermForTypeInfo(returnType)
-    } else {
-      boxedTypeTermForTypeInfo(returnType)
-    }
-    val resultTerm = newName("result")
-    val functionCallCode =
-      s"""
-        |${parameters.map(_.code).mkString("\n")}
-        |$resultTypeTerm $resultTerm = $functionReference.eval(
-        |  ${parameters.map(_.resultTerm).mkString(", ")});
-        |""".stripMargin
-
-    // convert result of function to internal representation (input unboxing)
-    val resultUnboxing = if (resultClass.isPrimitive) {
-      codeGenerator.generateNonNullLiteral(returnType, resultTerm)
-    } else {
-      codeGenerator.generateInputFieldUnboxing(returnType, resultTerm)
-    }
-    resultUnboxing.copy(code =
-      s"""
-        |$functionCallCode
-        |${resultUnboxing.code}
-        |""".stripMargin
-    )
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
deleted file mode 100644
index 330e2fe..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
+++ /dev/null
@@ -1,1025 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen.calls
-
-import org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY
-import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange}
-import org.apache.calcite.util.BuiltInMethod
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
-import org.apache.flink.api.table.codegen.CodeGenUtils._
-import org.apache.flink.api.table.codegen.{CodeGenerator, CodeGenException, GeneratedExpression}
-import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo
-import org.apache.flink.api.table.typeutils.TypeCheckUtils._
-
-object ScalarOperators {
-
-  def generateStringConcatOperator(
-      nullCheck: Boolean,
-      left: GeneratedExpression,
-      right: GeneratedExpression)
-    : GeneratedExpression = {
-    generateOperatorIfNotNull(nullCheck, STRING_TYPE_INFO, left, right) {
-      (leftTerm, rightTerm) => s"$leftTerm + $rightTerm"
-    }
-  }
-
-  def generateArithmeticOperator(
-      operator: String,
-      nullCheck: Boolean,
-      resultType: TypeInformation[_],
-      left: GeneratedExpression,
-      right: GeneratedExpression)
-    : GeneratedExpression = {
-    val leftCasting = numericCasting(left.resultType, resultType)
-    val rightCasting = numericCasting(right.resultType, resultType)
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
-
-    generateOperatorIfNotNull(nullCheck, resultType, left, right) {
-      (leftTerm, rightTerm) =>
-        if (isDecimal(resultType)) {
-          s"${leftCasting(leftTerm)}.${arithOpToDecMethod(operator)}(${rightCasting(rightTerm)})"
-        } else {
-          s"($resultTypeTerm) (${leftCasting(leftTerm)} $operator ${rightCasting(rightTerm)})"
-        }
-    }
-  }
-
-  def generateUnaryArithmeticOperator(
-      operator: String,
-      nullCheck: Boolean,
-      resultType: TypeInformation[_],
-      operand: GeneratedExpression)
-    : GeneratedExpression = {
-    generateUnaryOperatorIfNotNull(nullCheck, resultType, operand) {
-      (operandTerm) =>
-        if (isDecimal(operand.resultType) && operator == "-") {
-          s"$operandTerm.negate()"
-        } else if (isDecimal(operand.resultType) && operator == "+") {
-          s"$operandTerm"
-        } else {
-          s"$operator($operandTerm)"
-        }
-    }
-  }
-
-  def generateEquals(
-      nullCheck: Boolean,
-      left: GeneratedExpression,
-      right: GeneratedExpression)
-    : GeneratedExpression = {
-    // numeric types
-    if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
-      generateComparison("==", nullCheck, left, right)
-    }
-    // temporal types
-    else if (isTemporal(left.resultType) && left.resultType == right.resultType) {
-      generateComparison("==", nullCheck, left, right)
-    }
-    // array types
-    else if (isArray(left.resultType) && left.resultType == right.resultType) {
-      generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
-        (leftTerm, rightTerm) => s"java.util.Arrays.equals($leftTerm, $rightTerm)"
-      }
-    }
-    // comparable types of same type
-    else if (isComparable(left.resultType) && left.resultType == right.resultType) {
-      generateComparison("==", nullCheck, left, right)
-    }
-    // non comparable types
-    else {
-      generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
-        if (isReference(left)) {
-          (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)"
-        }
-        else if (isReference(right)) {
-          (leftTerm, rightTerm) => s"$rightTerm.equals($leftTerm)"
-        }
-        else {
-          throw new CodeGenException(s"Incomparable types: ${left.resultType} and " +
-            s"${right.resultType}")
-        }
-      }
-    }
-  }
-
-  def generateNotEquals(
-      nullCheck: Boolean,
-      left: GeneratedExpression,
-      right: GeneratedExpression)
-    : GeneratedExpression = {
-    // numeric types
-    if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
-      generateComparison("!=", nullCheck, left, right)
-    }
-    // temporal types
-    else if (isTemporal(left.resultType) && left.resultType == right.resultType) {
-      generateComparison("!=", nullCheck, left, right)
-    }
-    // array types
-    else if (isArray(left.resultType) && left.resultType == right.resultType) {
-      generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
-        (leftTerm, rightTerm) => s"!java.util.Arrays.equals($leftTerm, $rightTerm)"
-      }
-    }
-    // comparable types
-    else if (isComparable(left.resultType) && left.resultType == right.resultType) {
-      generateComparison("!=", nullCheck, left, right)
-    }
-    // non-comparable types
-    else {
-      generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
-        if (isReference(left)) {
-          (leftTerm, rightTerm) => s"!($leftTerm.equals($rightTerm))"
-        }
-        else if (isReference(right)) {
-          (leftTerm, rightTerm) => s"!($rightTerm.equals($leftTerm))"
-        }
-        else {
-          throw new CodeGenException(s"Incomparable types: ${left.resultType} and " +
-            s"${right.resultType}")
-        }
-      }
-    }
-  }
-
-  /**
-    * Generates comparison code for numeric types and comparable types of same type.
-    */
-  def generateComparison(
-      operator: String,
-      nullCheck: Boolean,
-      left: GeneratedExpression,
-      right: GeneratedExpression)
-    : GeneratedExpression = {
-    generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
-      // left is decimal or both sides are decimal
-      if (isDecimal(left.resultType) && isNumeric(right.resultType)) {
-        (leftTerm, rightTerm) => {
-          val operandCasting = numericCasting(right.resultType, left.resultType)
-          s"$leftTerm.compareTo(${operandCasting(rightTerm)}) $operator 0"
-        }
-      }
-      // right is decimal
-      else if (isNumeric(left.resultType) && isDecimal(right.resultType)) {
-        (leftTerm, rightTerm) => {
-          val operandCasting = numericCasting(left.resultType, right.resultType)
-          s"${operandCasting(leftTerm)}.compareTo($rightTerm) $operator 0"
-        }
-      }
-      // both sides are numeric
-      else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
-        (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
-      }
-      // both sides are temporal of same type
-      else if (isTemporal(left.resultType) && left.resultType == right.resultType) {
-        (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
-      }
-      // both sides are boolean
-      else if (isBoolean(left.resultType) && left.resultType == right.resultType) {
-        operator match {
-          case "==" | "!=" => (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
-          case _ => throw new CodeGenException(s"Unsupported boolean comparison '$operator'.")
-        }
-      }
-      // both sides are same comparable type
-      else if (isComparable(left.resultType) && left.resultType == right.resultType) {
-        (leftTerm, rightTerm) => s"$leftTerm.compareTo($rightTerm) $operator 0"
-      }
-      else {
-        throw new CodeGenException(s"Incomparable types: ${left.resultType} and " +
-            s"${right.resultType}")
-      }
-    }
-  }
-
-  def generateIsNull(
-      nullCheck: Boolean,
-      operand: GeneratedExpression)
-    : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-    val operatorCode = if (nullCheck) {
-      s"""
-        |${operand.code}
-        |boolean $resultTerm = ${operand.nullTerm};
-        |boolean $nullTerm = false;
-        |""".stripMargin
-    }
-    else if (!nullCheck && isReference(operand)) {
-      s"""
-        |${operand.code}
-        |boolean $resultTerm = ${operand.resultTerm} == null;
-        |boolean $nullTerm = false;
-        |""".stripMargin
-    }
-    else {
-      s"""
-        |${operand.code}
-        |boolean $resultTerm = false;
-        |""".stripMargin
-    }
-
-    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
-  }
-
-  def generateIsNotNull(
-      nullCheck: Boolean,
-      operand: GeneratedExpression)
-    : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-    val operatorCode = if (nullCheck) {
-      s"""
-        |${operand.code}
-        |boolean $resultTerm = !${operand.nullTerm};
-        |boolean $nullTerm = false;
-        |""".stripMargin
-    }
-    else if (!nullCheck && isReference(operand)) {
-      s"""
-        |${operand.code}
-        |boolean $resultTerm = ${operand.resultTerm} != null;
-        |boolean $nullTerm = false;
-        |""".stripMargin
-    }
-    else {
-      s"""
-        |${operand.code}
-        |boolean $resultTerm = true;
-        |""".stripMargin
-    }
-
-    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
-  }
-
-  def generateAnd(
-      nullCheck: Boolean,
-      left: GeneratedExpression,
-      right: GeneratedExpression)
-    : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-
-    val operatorCode = if (nullCheck) {
-      // Three-valued logic:
-      // no Unknown -> Two-valued logic
-      // True && Unknown -> Unknown
-      // False && Unknown -> False
-      // Unknown && True -> Unknown
-      // Unknown && False -> False
-      // Unknown && Unknown -> Unknown
-      s"""
-        |${left.code}
-        |${right.code}
-        |boolean $resultTerm;
-        |boolean $nullTerm;
-        |if (!${left.nullTerm} && !${right.nullTerm}) {
-        |  $resultTerm = ${left.resultTerm} && ${right.resultTerm};
-        |  $nullTerm = false;
-        |}
-        |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) {
-        |  $resultTerm = false;
-        |  $nullTerm = true;
-        |}
-        |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) {
-        |  $resultTerm = false;
-        |  $nullTerm = false;
-        |}
-        |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) {
-        |  $resultTerm = false;
-        |  $nullTerm = true;
-        |}
-        |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) {
-        |  $resultTerm = false;
-        |  $nullTerm = false;
-        |}
-        |else {
-        |  $resultTerm = false;
-        |  $nullTerm = true;
-        |}
-        |""".stripMargin
-    }
-    else {
-      s"""
-        |${left.code}
-        |${right.code}
-        |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm};
-        |""".stripMargin
-    }
-
-    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
-  }
-
-  def generateOr(
-      nullCheck: Boolean,
-      left: GeneratedExpression,
-      right: GeneratedExpression)
-    : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-
-    val operatorCode = if (nullCheck) {
-      // Three-valued logic:
-      // no Unknown -> Two-valued logic
-      // True && Unknown -> True
-      // False && Unknown -> Unknown
-      // Unknown && True -> True
-      // Unknown && False -> Unknown
-      // Unknown && Unknown -> Unknown
-      s"""
-        |${left.code}
-        |${right.code}
-        |boolean $resultTerm;
-        |boolean $nullTerm;
-        |if (!${left.nullTerm} && !${right.nullTerm}) {
-        |  $resultTerm = ${left.resultTerm} || ${right.resultTerm};
-        |  $nullTerm = false;
-        |}
-        |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) {
-        |  $resultTerm = true;
-        |  $nullTerm = false;
-        |}
-        |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) {
-        |  $resultTerm = false;
-        |  $nullTerm = true;
-        |}
-        |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) {
-        |  $resultTerm = true;
-        |  $nullTerm = false;
-        |}
-        |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) {
-        |  $resultTerm = false;
-        |  $nullTerm = true;
-        |}
-        |else {
-        |  $resultTerm = false;
-        |  $nullTerm = true;
-        |}
-        |""".stripMargin
-    }
-    else {
-      s"""
-        |${left.code}
-        |${right.code}
-        |boolean $resultTerm = ${left.resultTerm} || ${right.resultTerm};
-        |""".stripMargin
-    }
-
-    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
-  }
-
-  def generateNot(
-      nullCheck: Boolean,
-      operand: GeneratedExpression)
-    : GeneratedExpression = {
-    // Three-valued logic:
-    // no Unknown -> Two-valued logic
-    // Unknown -> Unknown
-    generateUnaryOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, operand) {
-      (operandTerm) => s"!($operandTerm)"
-    }
-  }
-
-  def generateIsTrue(operand: GeneratedExpression): GeneratedExpression = {
-    GeneratedExpression(
-      operand.resultTerm, // unknown is always false by default
-      GeneratedExpression.NEVER_NULL,
-      operand.code,
-      BOOLEAN_TYPE_INFO)
-  }
-
-  def generateIsNotTrue(operand: GeneratedExpression): GeneratedExpression = {
-    GeneratedExpression(
-      s"(!${operand.resultTerm})", // unknown is always false by default
-      GeneratedExpression.NEVER_NULL,
-      operand.code,
-      BOOLEAN_TYPE_INFO)
-  }
-
-  def generateIsFalse(operand: GeneratedExpression): GeneratedExpression = {
-    GeneratedExpression(
-      s"(!${operand.resultTerm} && !${operand.nullTerm})",
-      GeneratedExpression.NEVER_NULL,
-      operand.code,
-      BOOLEAN_TYPE_INFO)
-  }
-
-  def generateIsNotFalse(operand: GeneratedExpression): GeneratedExpression = {
-    GeneratedExpression(
-      s"(${operand.resultTerm} || ${operand.nullTerm})",
-      GeneratedExpression.NEVER_NULL,
-      operand.code,
-      BOOLEAN_TYPE_INFO)
-  }
-
-  def generateCast(
-      nullCheck: Boolean,
-      operand: GeneratedExpression,
-      targetType: TypeInformation[_])
-    : GeneratedExpression = (operand.resultType, targetType) match {
-    // identity casting
-    case (fromTp, toTp) if fromTp == toTp =>
-      operand
-
-    // Date/Time/Timestamp -> String
-    case (dtt: SqlTimeTypeInfo[_], STRING_TYPE_INFO) =>
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) => s"${internalToTimePointCode(dtt, operandTerm)}.toString()"
-      }
-
-    // Interval Months -> String
-    case (TimeIntervalTypeInfo.INTERVAL_MONTHS, STRING_TYPE_INFO) =>
-      val method = qualifyMethod(BuiltInMethod.INTERVAL_YEAR_MONTH_TO_STRING.method)
-      val timeUnitRange = qualifyEnum(TimeUnitRange.YEAR_TO_MONTH)
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) => s"$method($operandTerm, $timeUnitRange)"
-      }
-
-    // Interval Millis -> String
-    case (TimeIntervalTypeInfo.INTERVAL_MILLIS, STRING_TYPE_INFO) =>
-      val method = qualifyMethod(BuiltInMethod.INTERVAL_DAY_TIME_TO_STRING.method)
-      val timeUnitRange = qualifyEnum(TimeUnitRange.DAY_TO_SECOND)
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) => s"$method($operandTerm, $timeUnitRange, 3)" // milli second precision
-      }
-
-    // Object array -> String
-    case (_:ObjectArrayTypeInfo[_, _], STRING_TYPE_INFO) =>
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) => s"java.util.Arrays.deepToString($operandTerm)"
-      }
-
-    // Primitive array -> String
-    case (_:PrimitiveArrayTypeInfo[_], STRING_TYPE_INFO) =>
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) => s"java.util.Arrays.toString($operandTerm)"
-      }
-
-    // * (not Date/Time/Timestamp) -> String
-    case (_, STRING_TYPE_INFO) =>
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) => s""" "" + $operandTerm"""
-      }
-
-    // * -> Character
-    case (_, CHAR_TYPE_INFO) =>
-      throw new CodeGenException("Character type not supported.")
-
-    // String -> NUMERIC TYPE (not Character), Boolean
-    case (STRING_TYPE_INFO, _: NumericTypeInfo[_])
-        | (STRING_TYPE_INFO, BOOLEAN_TYPE_INFO) =>
-      val wrapperClass = targetType.getTypeClass.getCanonicalName
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) => s"$wrapperClass.valueOf($operandTerm)"
-      }
-
-    // String -> BigDecimal
-    case (STRING_TYPE_INFO, BIG_DEC_TYPE_INFO) =>
-      val wrapperClass = targetType.getTypeClass.getCanonicalName
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) => s"new $wrapperClass($operandTerm)"
-      }
-
-    // String -> Date
-    case (STRING_TYPE_INFO, SqlTimeTypeInfo.DATE) =>
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) => s"${qualifyMethod(BuiltInMethod.STRING_TO_DATE.method)}($operandTerm)"
-      }
-
-    // String -> Time
-    case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIME) =>
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) => s"${qualifyMethod(BuiltInMethod.STRING_TO_TIME.method)}($operandTerm)"
-      }
-
-    // String -> Timestamp
-    case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) =>
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) => s"${qualifyMethod(BuiltInMethod.STRING_TO_TIMESTAMP.method)}" +
-          s"($operandTerm)"
-      }
-
-    // Boolean -> NUMERIC TYPE
-    case (BOOLEAN_TYPE_INFO, nti: NumericTypeInfo[_]) =>
-      val targetTypeTerm = primitiveTypeTermForTypeInfo(nti)
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) => s"($targetTypeTerm) ($operandTerm ? 1 : 0)"
-      }
-
-    // Boolean -> BigDecimal
-    case (BOOLEAN_TYPE_INFO, BIG_DEC_TYPE_INFO) =>
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) => s"$operandTerm ? java.math.BigDecimal.ONE : java.math.BigDecimal.ZERO"
-      }
-
-    // NUMERIC TYPE -> Boolean
-    case (_: NumericTypeInfo[_], BOOLEAN_TYPE_INFO) =>
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) => s"$operandTerm != 0"
-      }
-
-    // BigDecimal -> Boolean
-    case (BIG_DEC_TYPE_INFO, BOOLEAN_TYPE_INFO) =>
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) => s"$operandTerm.compareTo(java.math.BigDecimal.ZERO) != 0"
-      }
-
-    // NUMERIC TYPE, BigDecimal -> NUMERIC TYPE, BigDecimal
-    case (_: NumericTypeInfo[_], _: NumericTypeInfo[_])
-        | (BIG_DEC_TYPE_INFO, _: NumericTypeInfo[_])
-        | (_: NumericTypeInfo[_], BIG_DEC_TYPE_INFO) =>
-      val operandCasting = numericCasting(operand.resultType, targetType)
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) => s"${operandCasting(operandTerm)}"
-      }
-
-    // Date -> Timestamp
-    case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) =>
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) =>
-          s"$operandTerm * ${classOf[DateTimeUtils].getCanonicalName}.MILLIS_PER_DAY"
-      }
-
-    // Timestamp -> Date
-    case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) =>
-      val targetTypeTerm = primitiveTypeTermForTypeInfo(targetType)
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) =>
-          s"($targetTypeTerm) ($operandTerm / " +
-            s"${classOf[DateTimeUtils].getCanonicalName}.MILLIS_PER_DAY)"
-      }
-
-    // Time -> Timestamp
-    case (SqlTimeTypeInfo.TIME, SqlTimeTypeInfo.TIMESTAMP) =>
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) => s"$operandTerm"
-      }
-
-    // Timestamp -> Time
-    case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.TIME) =>
-      val targetTypeTerm = primitiveTypeTermForTypeInfo(targetType)
-      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) =>
-          s"($targetTypeTerm) ($operandTerm % " +
-            s"${classOf[DateTimeUtils].getCanonicalName}.MILLIS_PER_DAY)"
-      }
-
-    // internal temporal casting
-    // Date -> Integer
-    // Time -> Integer
-    // Timestamp -> Long
-    // Integer -> Date
-    // Integer -> Time
-    // Long -> Timestamp
-    // Integer -> Interval Months
-    // Long -> Interval Millis
-    // Interval Months -> Integer
-    // Interval Millis -> Long
-    case (SqlTimeTypeInfo.DATE, INT_TYPE_INFO) |
-         (SqlTimeTypeInfo.TIME, INT_TYPE_INFO) |
-         (SqlTimeTypeInfo.TIMESTAMP, LONG_TYPE_INFO) |
-         (INT_TYPE_INFO, SqlTimeTypeInfo.DATE) |
-         (INT_TYPE_INFO, SqlTimeTypeInfo.TIME) |
-         (LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) |
-         (INT_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MONTHS) |
-         (LONG_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MILLIS) |
-         (TimeIntervalTypeInfo.INTERVAL_MONTHS, INT_TYPE_INFO) |
-         (TimeIntervalTypeInfo.INTERVAL_MILLIS, LONG_TYPE_INFO) =>
-      internalExprCasting(operand, targetType)
-
-    // internal reinterpretation of temporal types
-    // Date, Time, Interval Months -> Long
-    case  (SqlTimeTypeInfo.DATE, LONG_TYPE_INFO)
-        | (SqlTimeTypeInfo.TIME, LONG_TYPE_INFO)
-        | (TimeIntervalTypeInfo.INTERVAL_MONTHS, LONG_TYPE_INFO) =>
-      internalExprCasting(operand, targetType)
-
-    case (from, to) =>
-      throw new CodeGenException(s"Unsupported cast from '$from' to '$to'.")
-  }
-
-  def generateIfElse(
-      nullCheck: Boolean,
-      operands: Seq[GeneratedExpression],
-      resultType: TypeInformation[_],
-      i: Int = 0)
-    : GeneratedExpression = {
-    // else part
-    if (i == operands.size - 1) {
-      generateCast(nullCheck, operands(i), resultType)
-    }
-    else {
-      // check that the condition is boolean
-      // we do not check for null instead we use the default value
-      // thus null is false
-      requireBoolean(operands(i))
-      val condition = operands(i)
-      val trueAction = generateCast(nullCheck, operands(i + 1), resultType)
-      val falseAction = generateIfElse(nullCheck, operands, resultType, i + 2)
-
-      val resultTerm = newName("result")
-      val nullTerm = newName("isNull")
-      val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
-
-      val operatorCode = if (nullCheck) {
-        s"""
-          |${condition.code}
-          |$resultTypeTerm $resultTerm;
-          |boolean $nullTerm;
-          |if (${condition.resultTerm}) {
-          |  ${trueAction.code}
-          |  $resultTerm = ${trueAction.resultTerm};
-          |  $nullTerm = ${trueAction.nullTerm};
-          |}
-          |else {
-          |  ${falseAction.code}
-          |  $resultTerm = ${falseAction.resultTerm};
-          |  $nullTerm = ${falseAction.nullTerm};
-          |}
-          |""".stripMargin
-      }
-      else {
-        s"""
-          |${condition.code}
-          |$resultTypeTerm $resultTerm;
-          |if (${condition.resultTerm}) {
-          |  ${trueAction.code}
-          |  $resultTerm = ${trueAction.resultTerm};
-          |}
-          |else {
-          |  ${falseAction.code}
-          |  $resultTerm = ${falseAction.resultTerm};
-          |}
-          |""".stripMargin
-      }
-
-      GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType)
-    }
-  }
-
-  def generateTemporalPlusMinus(
-      plus: Boolean,
-      nullCheck: Boolean,
-      left: GeneratedExpression,
-      right: GeneratedExpression)
-    : GeneratedExpression = {
-
-    val op = if (plus) "+" else "-"
-
-    (left.resultType, right.resultType) match {
-      case (l: TimeIntervalTypeInfo[_], r: TimeIntervalTypeInfo[_]) if l == r =>
-        generateArithmeticOperator(op, nullCheck, l, left, right)
-
-      case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
-        generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, right) {
-            (l, r) => s"$l $op ((int) ($r / ${MILLIS_PER_DAY}L))"
-        }
-
-      case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MONTHS) =>
-        generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, right) {
-            (l, r) => s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($l, $op($r))"
-        }
-
-      case (SqlTimeTypeInfo.TIME, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
-        generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIME, left, right) {
-            (l, r) => s"$l $op ((int) ($r))"
-        }
-
-      case (SqlTimeTypeInfo.TIMESTAMP, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
-        generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIMESTAMP, left, right) {
-          (l, r) => s"$l $op $r"
-        }
-
-      case (SqlTimeTypeInfo.TIMESTAMP, TimeIntervalTypeInfo.INTERVAL_MONTHS) =>
-        generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIMESTAMP, left, right) {
-          (l, r) => s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($l, $op($r))"
-        }
-
-      case _ =>
-        throw new CodeGenException("Unsupported temporal arithmetic.")
-    }
-  }
-
-  def generateUnaryIntervalPlusMinus(
-      plus: Boolean,
-      nullCheck: Boolean,
-      operand: GeneratedExpression)
-    : GeneratedExpression = {
-    val operator = if (plus) "+" else "-"
-    generateUnaryArithmeticOperator(operator, nullCheck, operand.resultType, operand)
-  }
-
-  def generateArray(
-      codeGenerator: CodeGenerator,
-      resultType: TypeInformation[_],
-      elements: Seq[GeneratedExpression])
-    : GeneratedExpression = {
-    val arrayTerm = codeGenerator.addReusableArray(resultType.getTypeClass, elements.size)
-
-    val boxedElements: Seq[GeneratedExpression] = resultType match {
-
-      case oati: ObjectArrayTypeInfo[_, _] =>
-        // we box the elements to also represent null values
-        val boxedTypeTerm = boxedTypeTermForTypeInfo(oati.getComponentInfo)
-
-        elements.map { e =>
-          val boxedExpr = codeGenerator.generateOutputFieldBoxing(e)
-          val exprOrNull: String = if (codeGenerator.nullCheck) {
-            s"${boxedExpr.nullTerm} ? null : ($boxedTypeTerm) ${boxedExpr.resultTerm}"
-          } else {
-            boxedExpr.resultTerm
-          }
-          boxedExpr.copy(resultTerm = exprOrNull)
-        }
-
-      // no boxing necessary
-      case _: PrimitiveArrayTypeInfo[_] => elements
-    }
-
-    val code = boxedElements
-      .zipWithIndex
-      .map { case (element, idx) =>
-        s"""
-          |${element.code}
-          |$arrayTerm[$idx] = ${element.resultTerm};
-          |""".stripMargin
-      }
-      .mkString("\n")
-
-    GeneratedExpression(arrayTerm, GeneratedExpression.NEVER_NULL, code, resultType)
-  }
-
-  def generateArrayElementAt(
-      codeGenerator: CodeGenerator,
-      array: GeneratedExpression,
-      index: GeneratedExpression)
-    : GeneratedExpression = {
-
-    val resultTerm = newName("result")
-
-    array.resultType match {
-
-      // unbox object array types
-      case oati: ObjectArrayTypeInfo[_, _] =>
-        // get boxed array element
-        val resultTypeTerm = boxedTypeTermForTypeInfo(oati.getComponentInfo)
-
-        val arrayAccessCode = if (codeGenerator.nullCheck) {
-          s"""
-            |${array.code}
-            |${index.code}
-            |$resultTypeTerm $resultTerm = (${array.nullTerm} || ${index.nullTerm}) ?
-            |  null : ${array.resultTerm}[${index.resultTerm} - 1];
-            |""".stripMargin
-        } else {
-          s"""
-            |${array.code}
-            |${index.code}
-            |$resultTypeTerm $resultTerm = ${array.resultTerm}[${index.resultTerm} - 1];
-            |""".stripMargin
-        }
-
-        // generate unbox code
-        val unboxing = codeGenerator.generateInputFieldUnboxing(oati.getComponentInfo, resultTerm)
-
-        unboxing.copy(code =
-          s"""
-            |$arrayAccessCode
-            |${unboxing.code}
-            |""".stripMargin
-        )
-
-      // no unboxing necessary
-      case pati: PrimitiveArrayTypeInfo[_] =>
-        generateOperatorIfNotNull(codeGenerator.nullCheck, pati.getComponentType, array, index) {
-          (leftTerm, rightTerm) => s"$leftTerm[$rightTerm - 1]"
-        }
-    }
-  }
-
-  def generateArrayElement(
-      codeGenerator: CodeGenerator,
-      array: GeneratedExpression)
-    : GeneratedExpression = {
-
-    val nullTerm = newName("isNull")
-    val resultTerm = newName("result")
-    val resultType = array.resultType match {
-      case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
-      case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
-    }
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
-    val defaultValue = primitiveDefaultValue(resultType)
-
-    val arrayLengthCode = if (codeGenerator.nullCheck) {
-      s"${array.nullTerm} ? 0 : ${array.resultTerm}.length"
-    } else {
-      s"${array.resultTerm}.length"
-    }
-
-    val arrayAccessCode = array.resultType match {
-      case oati: ObjectArrayTypeInfo[_, _] =>
-        // generate unboxing code
-        val unboxing = codeGenerator.generateInputFieldUnboxing(
-          oati.getComponentInfo,
-          s"${array.resultTerm}[0]")
-
-        s"""
-          |${array.code}
-          |${if (codeGenerator.nullCheck) s"boolean $nullTerm;" else "" }
-          |$resultTypeTerm $resultTerm;
-          |switch ($arrayLengthCode) {
-          |  case 0:
-          |    ${if (codeGenerator.nullCheck) s"$nullTerm = true;" else "" }
-          |    $resultTerm = $defaultValue;
-          |    break;
-          |  case 1:
-          |    ${unboxing.code}
-          |    ${if (codeGenerator.nullCheck) s"$nullTerm = ${unboxing.nullTerm};" else "" }
-          |    $resultTerm = ${unboxing.resultTerm};
-          |    break;
-          |  default:
-          |    throw new RuntimeException("Array has more than one element.");
-          |}
-          |""".stripMargin
-
-      case pati: PrimitiveArrayTypeInfo[_] =>
-        s"""
-          |${array.code}
-          |${if (codeGenerator.nullCheck) s"boolean $nullTerm;" else "" }
-          |$resultTypeTerm $resultTerm;
-          |switch ($arrayLengthCode) {
-          |  case 0:
-          |    ${if (codeGenerator.nullCheck) s"$nullTerm = true;" else "" }
-          |    $resultTerm = $defaultValue;
-          |    break;
-          |  case 1:
-          |    ${if (codeGenerator.nullCheck) s"$nullTerm = false;" else "" }
-          |    $resultTerm = ${array.resultTerm}[0];
-          |    break;
-          |  default:
-          |    throw new RuntimeException("Array has more than one element.");
-          |}
-          |""".stripMargin
-    }
-
-    GeneratedExpression(resultTerm, nullTerm, arrayAccessCode, resultType)
-  }
-
-  def generateArrayCardinality(
-      nullCheck: Boolean,
-      array: GeneratedExpression)
-    : GeneratedExpression = {
-
-    generateUnaryOperatorIfNotNull(nullCheck, INT_TYPE_INFO, array) {
-      (operandTerm) => s"${array.resultTerm}.length"
-    }
-  }
-
-  // ----------------------------------------------------------------------------------------------
-
-  private def generateUnaryOperatorIfNotNull(
-      nullCheck: Boolean,
-      resultType: TypeInformation[_],
-      operand: GeneratedExpression)
-      (expr: (String) => String)
-    : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
-    val defaultValue = primitiveDefaultValue(resultType)
-
-    val operatorCode = if (nullCheck) {
-      s"""
-        |${operand.code}
-        |$resultTypeTerm $resultTerm;
-        |boolean $nullTerm;
-        |if (!${operand.nullTerm}) {
-        |  $resultTerm = ${expr(operand.resultTerm)};
-        |  $nullTerm = false;
-        |}
-        |else {
-        |  $resultTerm = $defaultValue;
-        |  $nullTerm = true;
-        |}
-        |""".stripMargin
-    }
-    else {
-      s"""
-        |${operand.code}
-        |$resultTypeTerm $resultTerm = ${expr(operand.resultTerm)};
-        |""".stripMargin
-    }
-
-    GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType)
-  }
-
-  private def generateOperatorIfNotNull(
-      nullCheck: Boolean,
-      resultType: TypeInformation[_],
-      left: GeneratedExpression,
-      right: GeneratedExpression)
-      (expr: (String, String) => String)
-    : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
-    val defaultValue = primitiveDefaultValue(resultType)
-
-    val resultCode = if (nullCheck) {
-      s"""
-        |${left.code}
-        |${right.code}
-        |boolean $nullTerm = ${left.nullTerm} || ${right.nullTerm};
-        |$resultTypeTerm $resultTerm;
-        |if ($nullTerm) {
-        |  $resultTerm = $defaultValue;
-        |}
-        |else {
-        |  $resultTerm = ${expr(left.resultTerm, right.resultTerm)};
-        |}
-        |""".stripMargin
-    }
-    else {
-      s"""
-        |${left.code}
-        |${right.code}
-        |$resultTypeTerm $resultTerm = ${expr(left.resultTerm, right.resultTerm)};
-        |""".stripMargin
-    }
-
-    GeneratedExpression(resultTerm, nullTerm, resultCode, resultType)
-  }
-
-  private def internalExprCasting(
-      expr: GeneratedExpression,
-      typeInfo: TypeInformation[_])
-    : GeneratedExpression = {
-    GeneratedExpression(expr.resultTerm, expr.nullTerm, expr.code, typeInfo)
-  }
-
-  private def arithOpToDecMethod(operator: String): String = operator match {
-    case "+" => "add"
-    case "-" => "subtract"
-    case "*" => "multiply"
-    case "/" => "divide"
-    case "%" => "remainder"
-    case _ => throw new CodeGenException("Unsupported decimal arithmetic operator.")
-  }
-
-  private def numericCasting(
-      operandType: TypeInformation[_],
-      resultType: TypeInformation[_])
-    : (String) => String = {
-
-    def decToPrimMethod(targetType: TypeInformation[_]): String = targetType match {
-      case BYTE_TYPE_INFO => "byteValueExact"
-      case SHORT_TYPE_INFO => "shortValueExact"
-      case INT_TYPE_INFO => "intValueExact"
-      case LONG_TYPE_INFO => "longValueExact"
-      case FLOAT_TYPE_INFO => "floatValue"
-      case DOUBLE_TYPE_INFO => "doubleValue"
-      case _ => throw new CodeGenException("Unsupported decimal casting type.")
-    }
-
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
-    // no casting necessary
-    if (operandType == resultType) {
-      (operandTerm) => s"$operandTerm"
-    }
-    // result type is decimal but numeric operand is not
-    else if (isDecimal(resultType) && !isDecimal(operandType) && isNumeric(operandType)) {
-      (operandTerm) =>
-        s"java.math.BigDecimal.valueOf((${superPrimitive(operandType)}) $operandTerm)"
-    }
-    // numeric result type is not decimal but operand is
-    else if (isNumeric(resultType) && !isDecimal(resultType) && isDecimal(operandType) ) {
-      (operandTerm) => s"$operandTerm.${decToPrimMethod(resultType)}()"
-    }
-    // result type and operand type are numeric but not decimal
-    else if (isNumeric(operandType) && isNumeric(resultType)
-        && !isDecimal(operandType) && !isDecimal(resultType)) {
-      (operandTerm) => s"(($resultTypeTerm) $operandTerm)"
-    }
-    else {
-      throw new CodeGenException(s"Unsupported casting from $operandType to $resultType.")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala
deleted file mode 100644
index 37e70e4..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen.calls
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.codegen.CodeGenUtils._
-import org.apache.flink.api.table.codegen.GeneratedExpression.NEVER_NULL
-import org.apache.flink.api.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
-import org.apache.flink.api.table.functions.TableFunction
-import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._
-
-/**
-  * Generates a call to user-defined [[TableFunction]].
-  *
-  * @param tableFunction user-defined [[TableFunction]] that might be overloaded
-  * @param signature actual signature with which the function is called
-  * @param returnType actual return type required by the surrounding
-  */
-class TableFunctionCallGen(
-    tableFunction: TableFunction[_],
-    signature: Seq[TypeInformation[_]],
-    returnType: TypeInformation[_])
-  extends CallGenerator {
-
-  override def generate(
-      codeGenerator: CodeGenerator,
-      operands: Seq[GeneratedExpression])
-    : GeneratedExpression = {
-    // determine function signature
-    val matchingSignature = getSignature(tableFunction, signature)
-      .getOrElse(throw new CodeGenException("No matching signature found."))
-
-    // convert parameters for function (output boxing)
-    val parameters = matchingSignature
-        .zip(operands)
-        .map { case (paramClass, operandExpr) =>
-          if (paramClass.isPrimitive) {
-            operandExpr
-          } else {
-            val boxedTypeTerm = boxedTypeTermForTypeInfo(operandExpr.resultType)
-            val boxedExpr = codeGenerator.generateOutputFieldBoxing(operandExpr)
-            val exprOrNull: String = if (codeGenerator.nullCheck) {
-              s"${boxedExpr.nullTerm} ? null : ($boxedTypeTerm) ${boxedExpr.resultTerm}"
-            } else {
-              boxedExpr.resultTerm
-            }
-            boxedExpr.copy(resultTerm = exprOrNull)
-          }
-        }
-
-    // generate function call
-    val functionReference = codeGenerator.addReusableFunction(tableFunction)
-    val functionCallCode =
-      s"""
-        |${parameters.map(_.code).mkString("\n")}
-        |$functionReference.clear();
-        |$functionReference.eval(${parameters.map(_.resultTerm).mkString(", ")});
-        |""".stripMargin
-
-    // has no result
-    GeneratedExpression(
-      functionReference,
-      NEVER_NULL,
-      functionCallCode,
-      returnType)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGen.scala
deleted file mode 100644
index 678016b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGen.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen.calls
-
-import org.apache.calcite.sql.fun.SqlTrimFunction.Flag.{BOTH, LEADING, TRAILING}
-import org.apache.calcite.util.BuiltInMethod
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO
-import org.apache.flink.api.table.codegen.CodeGenUtils._
-import org.apache.flink.api.table.codegen.calls.CallGenerator._
-import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
-
-/**
-  * Generates a TRIM function call.
-  *
-  * First operand: trim mode (see [[org.apache.calcite.sql.fun.SqlTrimFunction.Flag]])
-  * Second operand: String to be removed
-  * Third operand: String to be trimmed
-  */
-class TrimCallGen extends CallGenerator {
-
-  override def generate(
-      codeGenerator: CodeGenerator,
-      operands: Seq[GeneratedExpression])
-    : GeneratedExpression = {
-    generateCallIfArgsNotNull(codeGenerator.nullCheck, STRING_TYPE_INFO, operands) {
-      (terms) =>
-        val leading = compareEnum(terms.head, BOTH) || compareEnum(terms.head, LEADING)
-        val trailing = compareEnum(terms.head, BOTH) || compareEnum(terms.head, TRAILING)
-        s"""
-          |${qualifyMethod(BuiltInMethod.TRIM.method)}(
-          |  $leading, $trailing, ${terms(1)}, ${terms(2)})
-          |""".stripMargin
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala
deleted file mode 100644
index bb52ad8..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-/**
-  * Describes a generated expression.
-  *
-  * @param resultTerm term to access the result of the expression
-  * @param nullTerm boolean term that indicates if expression is null
-  * @param code code necessary to produce resultTerm and nullTerm
-  * @param resultType type of the resultTerm
-  */
-case class GeneratedExpression(
-    resultTerm: String,
-    nullTerm: String,
-    code: String,
-    resultType: TypeInformation[_])
-
-object GeneratedExpression {
-  val ALWAYS_NULL = "true"
-  val NEVER_NULL = "false"
-  val NO_CODE = ""
-}
-
-case class GeneratedFunction[T](name: String, returnType: TypeInformation[Any], code: String)

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala
deleted file mode 100644
index b69ac1c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-package object codegen {
-  // Used in ExpressionCodeGenerator because Scala 2.10 reflection is not thread safe. We might
-  // have several parallel expression operators in one TaskManager, therefore we need to guard
-  // these operations.
-  object ReflectionLock
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
deleted file mode 100644
index 0a100dd..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-/**
-  * Exception for all errors occurring during expression parsing.
-  */
-case class ExpressionParserException(msg: String) extends RuntimeException(msg)
-
-/**
-  * Exception for all errors occurring during sql parsing.
-  */
-case class SqlParserException(
-    msg: String,
-    cause: Throwable)
-  extends RuntimeException(msg, cause) {
-
-  def this(msg: String) = this(msg, null)
-
-}
-
-/**
-  * General Exception for all errors during table handling.
-  */
-case class TableException(
-    msg: String,
-    cause: Throwable)
-  extends RuntimeException(msg, cause) {
-
-  def this(msg: String) = this(msg, null)
-
-}
-
-object TableException {
-  def apply(msg: String): TableException = new TableException(msg)
-}
-
-/**
-  * Exception for all errors occurring during validation phase.
-  */
-case class ValidationException(
-    msg: String,
-    cause: Throwable)
-  extends RuntimeException(msg, cause) {
-
-  def this(msg: String) = this(msg, null)
-
-}
-
-object ValidationException {
-  def apply(msg: String): ValidationException = new ValidationException(msg)
-}
-
-/**
-  * Exception for unwanted method calling on unresolved expression.
-  */
-case class UnresolvedException(msg: String) extends RuntimeException(msg)

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
deleted file mode 100644
index c284bd3..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.tools.RelBuilder
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.trees.TreeNode
-import org.apache.flink.api.table.validate.{ValidationResult, ValidationSuccess}
-
-abstract class Expression extends TreeNode[Expression] {
-  /**
-    * Returns the [[TypeInformation]] for evaluating this expression.
-    * It is sometimes not available until the expression is valid.
-    */
-  private[flink] def resultType: TypeInformation[_]
-
-  /**
-    * One pass validation of the expression tree in post order.
-    */
-  private[flink] lazy val valid: Boolean = childrenValid && validateInput().isSuccess
-
-  private[flink] def childrenValid: Boolean = children.forall(_.valid)
-
-  /**
-    * Check input data types, inputs number or other properties specified by this expression.
-    * Return `ValidationSuccess` if it pass the check,
-    * or `ValidationFailure` with supplement message explaining the error.
-    * Note: we should only call this method until `childrenValid == true`
-    */
-  private[flink] def validateInput(): ValidationResult = ValidationSuccess
-
-  /**
-    * Convert Expression to its counterpart in Calcite, i.e. RexNode
-    */
-  private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
-    throw new UnsupportedOperationException(
-      s"${this.getClass.getName} cannot be transformed to RexNode"
-    )
-
-  private[flink] def checkEquals(other: Expression): Boolean = {
-    if (this.getClass != other.getClass) {
-      false
-    } else {
-      def checkEquality(elements1: Seq[Any], elements2: Seq[Any]): Boolean = {
-        elements1.length == elements2.length && elements1.zip(elements2).forall {
-          case (e1: Expression, e2: Expression) => e1.checkEquals(e2)
-          case (t1: Seq[_], t2: Seq[_]) => checkEquality(t1, t2)
-          case (i1, i2) => i1 == i2
-        }
-      }
-      val elements1 = this.productIterator.toSeq
-      val elements2 = other.productIterator.toSeq
-      checkEquality(elements1, elements2)
-    }
-  }
-}
-
-abstract class BinaryExpression extends Expression {
-  private[flink] def left: Expression
-  private[flink] def right: Expression
-  private[flink] def children = Seq(left, right)
-}
-
-abstract class UnaryExpression extends Expression {
-  private[flink] def child: Expression
-  private[flink] def children = Seq(child)
-}
-
-abstract class LeafExpression extends Expression {
-  private[flink] val children = Nil
-}