You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:08 UTC

[39/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
deleted file mode 100644
index a706309..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen
-
-import java.lang.reflect.{Field, Method}
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.apache.calcite.util.BuiltInMethod
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._
-import org.apache.flink.api.common.typeinfo.{FractionalTypeInfo, SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo, TypeExtractor}
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.table.typeutils.{TimeIntervalTypeInfo, TypeCheckUtils}
-
-object CodeGenUtils {
-
-  private val nameCounter = new AtomicInteger
-
-  def newName(name: String): String = {
-    s"$name$$${nameCounter.getAndIncrement}"
-  }
-
-  // when casting we first need to unbox Primitives, for example,
-  // float a = 1.0f;
-  // byte b = (byte) a;
-  // works, but for boxed types we need this:
-  // Float a = 1.0f;
-  // Byte b = (byte)(float) a;
-  def primitiveTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match {
-    case INT_TYPE_INFO => "int"
-    case LONG_TYPE_INFO => "long"
-    case SHORT_TYPE_INFO => "short"
-    case BYTE_TYPE_INFO => "byte"
-    case FLOAT_TYPE_INFO => "float"
-    case DOUBLE_TYPE_INFO => "double"
-    case BOOLEAN_TYPE_INFO => "boolean"
-    case CHAR_TYPE_INFO => "char"
-
-    // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections
-    // does not seem to like this, so we manually give the correct type here.
-    case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
-    case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
-    case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
-    case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
-    case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
-    case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
-    case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
-    case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
-
-    // internal primitive representation of time points
-    case SqlTimeTypeInfo.DATE => "int"
-    case SqlTimeTypeInfo.TIME => "int"
-    case SqlTimeTypeInfo.TIMESTAMP => "long"
-
-    // internal primitive representation of time intervals
-    case TimeIntervalTypeInfo.INTERVAL_MONTHS => "int"
-    case TimeIntervalTypeInfo.INTERVAL_MILLIS => "long"
-
-    case _ =>
-      tpe.getTypeClass.getCanonicalName
-  }
-
-  def boxedTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match {
-    // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections
-    // does not seem to like this, so we manually give the correct type here.
-    case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
-    case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
-    case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
-    case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
-    case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
-    case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
-    case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
-    case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
-
-    case _ =>
-      tpe.getTypeClass.getCanonicalName
-  }
-
-  def primitiveDefaultValue(tpe: TypeInformation[_]): String = tpe match {
-    case INT_TYPE_INFO => "-1"
-    case LONG_TYPE_INFO => "-1L"
-    case SHORT_TYPE_INFO => "-1"
-    case BYTE_TYPE_INFO => "-1"
-    case FLOAT_TYPE_INFO => "-1.0f"
-    case DOUBLE_TYPE_INFO => "-1.0d"
-    case BOOLEAN_TYPE_INFO => "false"
-    case STRING_TYPE_INFO => "\"\""
-    case CHAR_TYPE_INFO => "'\\0'"
-    case SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIME => "-1"
-    case SqlTimeTypeInfo.TIMESTAMP => "-1L"
-    case TimeIntervalTypeInfo.INTERVAL_MONTHS => "-1"
-    case TimeIntervalTypeInfo.INTERVAL_MILLIS => "-1L"
-
-    case _ => "null"
-  }
-
-  def superPrimitive(typeInfo: TypeInformation[_]): String = typeInfo match {
-    case _: FractionalTypeInfo[_] => "double"
-    case _ => "long"
-  }
-
-  def qualifyMethod(method: Method): String =
-    method.getDeclaringClass.getCanonicalName + "." + method.getName
-
-  def qualifyEnum(enum: Enum[_]): String =
-    enum.getClass.getCanonicalName + "." + enum.name()
-
-  def internalToTimePointCode(resultType: TypeInformation[_], resultTerm: String) =
-    resultType match {
-      case SqlTimeTypeInfo.DATE =>
-        s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_DATE.method)}($resultTerm)"
-      case SqlTimeTypeInfo.TIME =>
-        s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_TIME.method)}($resultTerm)"
-      case SqlTimeTypeInfo.TIMESTAMP =>
-        s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_TIMESTAMP.method)}($resultTerm)"
-    }
-
-  def timePointToInternalCode(resultType: TypeInformation[_], resultTerm: String) =
-    resultType match {
-      case SqlTimeTypeInfo.DATE =>
-        s"${qualifyMethod(BuiltInMethod.DATE_TO_INT.method)}($resultTerm)"
-      case SqlTimeTypeInfo.TIME =>
-        s"${qualifyMethod(BuiltInMethod.TIME_TO_INT.method)}($resultTerm)"
-      case SqlTimeTypeInfo.TIMESTAMP =>
-        s"${qualifyMethod(BuiltInMethod.TIMESTAMP_TO_LONG.method)}($resultTerm)"
-    }
-
-  def compareEnum(term: String, enum: Enum[_]): Boolean = term == qualifyEnum(enum)
-
-  def getEnum(genExpr: GeneratedExpression): Enum[_] = {
-    val split = genExpr.resultTerm.split('.')
-    val value = split.last
-    val clazz = genExpr.resultType.getTypeClass
-    enumValueOf(clazz, value)
-  }
-
-  def enumValueOf[T <: Enum[T]](cls: Class[_], stringValue: String): Enum[_] =
-    Enum.valueOf(cls.asInstanceOf[Class[T]], stringValue).asInstanceOf[Enum[_]]
-
-  // ----------------------------------------------------------------------------------------------
-
-  def requireNumeric(genExpr: GeneratedExpression) =
-    if (!TypeCheckUtils.isNumeric(genExpr.resultType)) {
-      throw new CodeGenException("Numeric expression type expected, but was " +
-        s"'${genExpr.resultType}'.")
-    }
-
-  def requireComparable(genExpr: GeneratedExpression) =
-    if (!TypeCheckUtils.isComparable(genExpr.resultType)) {
-      throw new CodeGenException(s"Comparable type expected, but was '${genExpr.resultType}'.")
-    }
-
-  def requireString(genExpr: GeneratedExpression) =
-    if (!TypeCheckUtils.isString(genExpr.resultType)) {
-      throw new CodeGenException("String expression type expected.")
-    }
-
-  def requireBoolean(genExpr: GeneratedExpression) =
-    if (!TypeCheckUtils.isBoolean(genExpr.resultType)) {
-      throw new CodeGenException("Boolean expression type expected.")
-    }
-
-  def requireTemporal(genExpr: GeneratedExpression) =
-    if (!TypeCheckUtils.isTemporal(genExpr.resultType)) {
-      throw new CodeGenException("Temporal expression type expected.")
-    }
-
-  def requireTimeInterval(genExpr: GeneratedExpression) =
-    if (!TypeCheckUtils.isTimeInterval(genExpr.resultType)) {
-      throw new CodeGenException("Interval expression type expected.")
-    }
-
-  def requireArray(genExpr: GeneratedExpression) =
-    if (!TypeCheckUtils.isArray(genExpr.resultType)) {
-      throw new CodeGenException("Array expression type expected.")
-    }
-
-  def requireInteger(genExpr: GeneratedExpression) =
-    if (!TypeCheckUtils.isInteger(genExpr.resultType)) {
-      throw new CodeGenException("Integer expression type expected.")
-    }
-
-  // ----------------------------------------------------------------------------------------------
-
-  def isReference(genExpr: GeneratedExpression): Boolean = isReference(genExpr.resultType)
-
-  def isReference(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
-    case INT_TYPE_INFO
-         | LONG_TYPE_INFO
-         | SHORT_TYPE_INFO
-         | BYTE_TYPE_INFO
-         | FLOAT_TYPE_INFO
-         | DOUBLE_TYPE_INFO
-         | BOOLEAN_TYPE_INFO
-         | CHAR_TYPE_INFO => false
-    case _ => true
-  }
-
-  // ----------------------------------------------------------------------------------------------
-
-  sealed abstract class FieldAccessor
-
-  case class ObjectFieldAccessor(field: Field) extends FieldAccessor
-
-  case class ObjectGenericFieldAccessor(name: String) extends FieldAccessor
-
-  case class ObjectPrivateFieldAccessor(field: Field) extends FieldAccessor
-
-  case class ObjectMethodAccessor(methodName: String) extends FieldAccessor
-
-  case class ProductAccessor(i: Int) extends FieldAccessor
-
-  def fieldAccessorFor(compType: CompositeType[_], index: Int): FieldAccessor = {
-    compType match {
-      case ri: RowTypeInfo =>
-        ProductAccessor(index)
-
-      case cc: CaseClassTypeInfo[_] =>
-        ObjectMethodAccessor(cc.getFieldNames()(index))
-
-      case javaTup: TupleTypeInfo[_] =>
-        ObjectGenericFieldAccessor("f" + index)
-
-      case pt: PojoTypeInfo[_] =>
-        val fieldName = pt.getFieldNames()(index)
-        getFieldAccessor(pt.getTypeClass, fieldName)
-
-      case _ => throw new CodeGenException("Unsupported composite type.")
-    }
-  }
-
-  def getFieldAccessor(clazz: Class[_], fieldName: String): FieldAccessor = {
-    val field = TypeExtractor.getDeclaredField(clazz, fieldName)
-    if (field.isAccessible) {
-      ObjectFieldAccessor(field)
-    }
-    else {
-      ObjectPrivateFieldAccessor(field)
-    }
-  }
-
-  def isFieldPrimitive(field: Field): Boolean = field.getType.isPrimitive
-
-  def reflectiveFieldReadAccess(fieldTerm: String, field: Field, objectTerm: String): String =
-    field.getType match {
-      case java.lang.Integer.TYPE => s"$fieldTerm.getInt($objectTerm)"
-      case java.lang.Long.TYPE => s"$fieldTerm.getLong($objectTerm)"
-      case java.lang.Short.TYPE => s"$fieldTerm.getShort($objectTerm)"
-      case java.lang.Byte.TYPE => s"$fieldTerm.getByte($objectTerm)"
-      case java.lang.Float.TYPE => s"$fieldTerm.getFloat($objectTerm)"
-      case java.lang.Double.TYPE => s"$fieldTerm.getDouble($objectTerm)"
-      case java.lang.Boolean.TYPE => s"$fieldTerm.getBoolean($objectTerm)"
-      case java.lang.Character.TYPE => s"$fieldTerm.getChar($objectTerm)"
-      case _ => s"(${field.getType.getCanonicalName}) $fieldTerm.get($objectTerm)"
-    }
-
-  def reflectiveFieldWriteAccess(
-      fieldTerm: String,
-      field: Field,
-      objectTerm: String,
-      valueTerm: String)
-    : String =
-    field.getType match {
-      case java.lang.Integer.TYPE => s"$fieldTerm.setInt($objectTerm, $valueTerm)"
-      case java.lang.Long.TYPE => s"$fieldTerm.setLong($objectTerm, $valueTerm)"
-      case java.lang.Short.TYPE => s"$fieldTerm.setShort($objectTerm, $valueTerm)"
-      case java.lang.Byte.TYPE => s"$fieldTerm.setByte($objectTerm, $valueTerm)"
-      case java.lang.Float.TYPE => s"$fieldTerm.setFloat($objectTerm, $valueTerm)"
-      case java.lang.Double.TYPE => s"$fieldTerm.setDouble($objectTerm, $valueTerm)"
-      case java.lang.Boolean.TYPE => s"$fieldTerm.setBoolean($objectTerm, $valueTerm)"
-      case java.lang.Character.TYPE => s"$fieldTerm.setChar($objectTerm, $valueTerm)"
-      case _ => s"$fieldTerm.set($objectTerm, $valueTerm)"
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
deleted file mode 100644
index cdb3753..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ /dev/null
@@ -1,1522 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen
-
-import java.math.{BigDecimal => JBigDecimal}
-
-import org.apache.calcite.avatica.util.DateTimeUtils
-import org.apache.calcite.rex._
-import org.apache.calcite.sql.SqlOperator
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.fun.SqlStdOperatorTable._
-import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction, Function, MapFunction}
-import org.apache.flink.api.common.io.GenericInputFormat
-import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, TupleTypeInfo}
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.table.codegen.CodeGenUtils._
-import org.apache.flink.api.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
-import org.apache.flink.api.table.codegen.Indenter.toISC
-import org.apache.flink.api.table.codegen.calls.FunctionGenerator
-import org.apache.flink.api.table.codegen.calls.ScalarOperators._
-import org.apache.flink.api.table.functions.UserDefinedFunction
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.table.typeutils.TypeConverter
-import org.apache.flink.api.table.typeutils.TypeCheckUtils._
-import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-
-/**
-  * A code generator for generating Flink [[org.apache.flink.api.common.functions.Function]]s.
-  *
-  * @param config configuration that determines runtime behavior
-  * @param nullableInput input(s) can be null.
-  * @param input1 type information about the first input of the Function
-  * @param input2 type information about the second input if the Function is binary
-  * @param input1PojoFieldMapping additional mapping information if input1 is a POJO (POJO types
-  *                              have no deterministic field order).
-  * @param input2PojoFieldMapping additional mapping information if input2 is a POJO (POJO types
-  *                              have no deterministic field order).
-  *
-  */
-class CodeGenerator(
-   config: TableConfig,
-   nullableInput: Boolean,
-   input1: TypeInformation[Any],
-   input2: Option[TypeInformation[Any]] = None,
-   input1PojoFieldMapping: Option[Array[Int]] = None,
-   input2PojoFieldMapping: Option[Array[Int]] = None)
-  extends RexVisitor[GeneratedExpression] {
-
-  // check if nullCheck is enabled when inputs can be null
-  if (nullableInput && !config.getNullCheck) {
-    throw new CodeGenException("Null check must be enabled if entire rows can be null.")
-  }
-
-  // check for POJO input1 mapping
-  input1 match {
-    case pt: PojoTypeInfo[_] =>
-      input1PojoFieldMapping.getOrElse(
-        throw new CodeGenException("No input mapping is specified for input1 of type POJO."))
-    case _ => // ok
-  }
-
-  // check for POJO input2 mapping
-  input2 match {
-    case Some(pt: PojoTypeInfo[_]) =>
-      input2PojoFieldMapping.getOrElse(
-        throw new CodeGenException("No input mapping is specified for input2 of type POJO."))
-    case _ => // ok
-  }
-
-  /**
-    * A code generator for generating unary Flink
-    * [[org.apache.flink.api.common.functions.Function]]s with one input.
-    *
-    * @param config configuration that determines runtime behavior
-    * @param nullableInput input(s) can be null.
-    * @param input type information about the input of the Function
-    * @param inputPojoFieldMapping additional mapping information necessary if input is a
-    *                              POJO (POJO types have no deterministic field order).
-    */
-  def this(
-      config: TableConfig,
-      nullableInput: Boolean,
-      input: TypeInformation[Any],
-      inputPojoFieldMapping: Array[Int]) =
-    this(config, nullableInput, input, None, Some(inputPojoFieldMapping))
-
-  /**
-    * A code generator for generating Flink input formats.
-    *
-    * @param config configuration that determines runtime behavior
-    */
-  def this(config: TableConfig) =
-    this(config, false, TypeConverter.DEFAULT_ROW_TYPE, None, None)
-
-  // set of member statements that will be added only once
-  // we use a LinkedHashSet to keep the insertion order
-  private val reusableMemberStatements = mutable.LinkedHashSet[String]()
-
-  // set of constructor statements that will be added only once
-  // we use a LinkedHashSet to keep the insertion order
-  private val reusableInitStatements = mutable.LinkedHashSet[String]()
-
-  // set of statements that will be added only once per record
-  // we use a LinkedHashSet to keep the insertion order
-  private val reusablePerRecordStatements = mutable.LinkedHashSet[String]()
-
-  // map of initial input unboxing expressions that will be added only once
-  // (inputTerm, index) -> expr
-  private val reusableInputUnboxingExprs = mutable.Map[(String, Int), GeneratedExpression]()
-
-  /**
-    * @return code block of statements that need to be placed in the member area of the Function
-    *         (e.g. member variables and their initialization)
-    */
-  def reuseMemberCode(): String = {
-    reusableMemberStatements.mkString("", "\n", "\n")
-  }
-
-  /**
-    * @return code block of statements that need to be placed in the constructor of the Function
-    */
-  def reuseInitCode(): String = {
-    reusableInitStatements.mkString("", "\n", "\n")
-  }
-
-  /**
-    * @return code block of statements that need to be placed in the SAM of the Function
-    */
-  def reusePerRecordCode(): String = {
-    reusablePerRecordStatements.mkString("", "\n", "\n")
-  }
-
-  /**
-    * @return code block of statements that unbox input variables to a primitive variable
-    *         and a corresponding null flag variable
-    */
-  def reuseInputUnboxingCode(): String = {
-    reusableInputUnboxingExprs.values.map(_.code).mkString("", "\n", "\n")
-  }
-
-  /**
-    * @return term of the (casted and possibly boxed) first input
-    */
-  var input1Term = "in1"
-
-  /**
-    * @return term of the (casted and possibly boxed) second input
-    */
-  var input2Term = "in2"
-
-  /**
-    * @return term of the (casted) output collector
-    */
-  var collectorTerm = "c"
-
-  /**
-    * @return term of the output record (possibly defined in the member area e.g. Row, Tuple)
-    */
-  var outRecordTerm = "out"
-
-  /**
-    * @return returns if null checking is enabled
-    */
-  def nullCheck: Boolean = config.getNullCheck
-
-  /**
-    * Generates an expression from a RexNode. If objects or variables can be reused, they will be
-    * added to reusable code sections internally.
-    *
-    * @param rex Calcite row expression
-    * @return instance of GeneratedExpression
-    */
-  def generateExpression(rex: RexNode): GeneratedExpression = {
-    rex.accept(this)
-  }
-
-  /**
-    * Generates a [[org.apache.flink.api.common.functions.Function]] that can be passed to Java
-    * compiler.
-    *
-    * @param name Class name of the Function. Must not be unique but has to be a valid Java class
-    *             identifier.
-    * @param clazz Flink Function to be generated.
-    * @param bodyCode code contents of the SAM (Single Abstract Method). Inputs, collector, or
-    *                 output record can be accessed via the given term methods.
-    * @param returnType expected return type
-    * @tparam T Flink Function to be generated.
-    * @return instance of GeneratedFunction
-    */
-  def generateFunction[T <: Function](
-      name: String,
-      clazz: Class[T],
-      bodyCode: String,
-      returnType: TypeInformation[Any])
-    : GeneratedFunction[T] = {
-    val funcName = newName(name)
-
-    // Janino does not support generics, that's why we need
-    // manual casting here
-    val samHeader =
-      // FlatMapFunction
-      if (clazz == classOf[FlatMapFunction[_,_]]) {
-        val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
-        (s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)",
-          List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
-      }
-
-      // MapFunction
-      else if (clazz == classOf[MapFunction[_,_]]) {
-        val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
-        ("Object map(Object _in1)",
-          List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
-      }
-
-      // FlatJoinFunction
-      else if (clazz == classOf[FlatJoinFunction[_,_,_]]) {
-        val inputTypeTerm1 = boxedTypeTermForTypeInfo(input1)
-        val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.getOrElse(
-            throw new CodeGenException("Input 2 for FlatJoinFunction should not be null")))
-        (s"void join(Object _in1, Object _in2, org.apache.flink.util.Collector $collectorTerm)",
-          List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;",
-          s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;"))
-      }
-      else {
-        // TODO more functions
-        throw new CodeGenException("Unsupported Function.")
-      }
-
-    val funcCode = j"""
-      public class $funcName
-          implements ${clazz.getCanonicalName} {
-
-        ${reuseMemberCode()}
-
-        public $funcName() throws Exception {
-          ${reuseInitCode()}
-        }
-
-        @Override
-        public ${samHeader._1} throws Exception {
-          ${samHeader._2.mkString("\n")}
-          ${reusePerRecordCode()}
-          ${reuseInputUnboxingCode()}
-          $bodyCode
-        }
-      }
-    """.stripMargin
-
-    GeneratedFunction(funcName, returnType, funcCode)
-  }
-
-  /**
-    * Generates a values input format that can be passed to Java compiler.
-    *
-    * @param name Class name of the input format. Must not be unique but has to be a
-    *             valid Java class identifier.
-    * @param records code for creating records
-    * @param returnType expected return type
-    * @tparam T Flink Function to be generated.
-    * @return instance of GeneratedFunction
-    */
-  def generateValuesInputFormat[T](
-      name: String,
-      records: Seq[String],
-      returnType: TypeInformation[Any])
-    : GeneratedFunction[GenericInputFormat[T]] = {
-    val funcName = newName(name)
-
-    addReusableOutRecord(returnType)
-
-    val funcCode = j"""
-      public class $funcName extends ${classOf[GenericInputFormat[_]].getCanonicalName} {
-
-        private int nextIdx = 0;
-
-        ${reuseMemberCode()}
-
-        public $funcName() throws Exception {
-          ${reuseInitCode()}
-        }
-
-        @Override
-        public boolean reachedEnd() throws java.io.IOException {
-          return nextIdx >= ${records.length};
-        }
-
-        @Override
-        public Object nextRecord(Object reuse) {
-          switch (nextIdx) {
-            ${records.zipWithIndex.map { case (r, i) =>
-              s"""
-                 |case $i:
-                 |  $r
-                 |break;
-               """.stripMargin
-            }.mkString("\n")}
-          }
-          nextIdx++;
-          return $outRecordTerm;
-        }
-      }
-    """.stripMargin
-
-    GeneratedFunction[GenericInputFormat[T]](funcName, returnType, funcCode)
-  }
-
-  /**
-    * Generates an expression that converts the first input (and second input) into the given type.
-    * If two inputs are converted, the second input is appended. If objects or variables can
-    * be reused, they will be added to reusable code sections internally. The evaluation result
-    * may be stored in the global result variable (see [[outRecordTerm]]).
-    *
-    * @param returnType conversion target type. Inputs and output must have the same arity.
-    * @param resultFieldNames result field names necessary for a mapping to POJO fields.
-    * @return instance of GeneratedExpression
-    */
-  def generateConverterResultExpression(
-      returnType: TypeInformation[_ <: Any],
-      resultFieldNames: Seq[String])
-    : GeneratedExpression = {
-    val input1AccessExprs = for (i <- 0 until input1.getArity)
-      yield generateInputAccess(input1, input1Term, i, input1PojoFieldMapping)
-
-    val input2AccessExprs = input2 match {
-      case Some(ti) => for (i <- 0 until ti.getArity)
-        yield generateInputAccess(ti, input2Term, i, input2PojoFieldMapping)
-      case None => Seq() // add nothing
-    }
-
-    generateResultExpression(input1AccessExprs ++ input2AccessExprs, returnType, resultFieldNames)
-  }
-
-  /**
-    * Generates an expression from the left input and the right table function.
-    */
-  def generateCorrelateAccessExprs: (Seq[GeneratedExpression], Seq[GeneratedExpression]) = {
-    val input1AccessExprs = for (i <- 0 until input1.getArity)
-      yield generateInputAccess(input1, input1Term, i, input1PojoFieldMapping)
-
-    val input2AccessExprs = input2 match {
-      case Some(ti) => for (i <- 0 until ti.getArity)
-        // use generateFieldAccess instead of generateInputAccess to avoid the generated table
-        // function's field access code is put on the top of function body rather than
-        // the while loop
-        yield generateFieldAccess(ti, input2Term, i, input2PojoFieldMapping)
-      case None => throw new CodeGenException("Type information of input2 must not be null.")
-    }
-    (input1AccessExprs, input2AccessExprs)
-  }
-
-  /**
-    * Generates an expression from a sequence of RexNode. If objects or variables can be reused,
-    * they will be added to reusable code sections internally. The evaluation result
-    * may be stored in the global result variable (see [[outRecordTerm]]).
-    *
-    * @param returnType conversion target type. Type must have the same arity than rexNodes.
-    * @param resultFieldNames result field names necessary for a mapping to POJO fields.
-    * @param rexNodes sequence of RexNode to be converted
-    * @return instance of GeneratedExpression
-    */
-  def generateResultExpression(
-      returnType: TypeInformation[_ <: Any],
-      resultFieldNames: Seq[String],
-      rexNodes: Seq[RexNode])
-    : GeneratedExpression = {
-    val fieldExprs = rexNodes.map(generateExpression)
-    generateResultExpression(fieldExprs, returnType, resultFieldNames)
-  }
-
-  /**
-    * Generates an expression from a sequence of other expressions. If objects or variables can
-    * be reused, they will be added to reusable code sections internally. The evaluation result
-    * may be stored in the global result variable (see [[outRecordTerm]]).
-    *
-    * @param fieldExprs field expressions to be converted
-    * @param returnType conversion target type. Type must have the same arity than fieldExprs.
-    * @param resultFieldNames result field names necessary for a mapping to POJO fields.
-    * @return instance of GeneratedExpression
-    */
-  def generateResultExpression(
-      fieldExprs: Seq[GeneratedExpression],
-      returnType: TypeInformation[_ <: Any],
-      resultFieldNames: Seq[String])
-    : GeneratedExpression = {
-    // initial type check
-    if (returnType.getArity != fieldExprs.length) {
-      throw new CodeGenException("Arity of result type does not match number of expressions.")
-    }
-    if (resultFieldNames.length != fieldExprs.length) {
-      throw new CodeGenException("Arity of result field names does not match number of " +
-        "expressions.")
-    }
-    // type check
-    returnType match {
-      case pt: PojoTypeInfo[_] =>
-        fieldExprs.zipWithIndex foreach {
-          case (fieldExpr, i) if fieldExpr.resultType != pt.getTypeAt(resultFieldNames(i)) =>
-            throw new CodeGenException("Incompatible types of expression and result type.")
-
-          case _ => // ok
-        }
-
-      case ct: CompositeType[_] =>
-        fieldExprs.zipWithIndex foreach {
-          case (fieldExpr, i) if fieldExpr.resultType != ct.getTypeAt(i) =>
-            throw new CodeGenException("Incompatible types of expression and result type.")
-          case _ => // ok
-        }
-
-      case at: AtomicType[_] if at != fieldExprs.head.resultType =>
-        throw new CodeGenException("Incompatible types of expression and result type.")
-
-      case _ => // ok
-    }
-
-    val returnTypeTerm = boxedTypeTermForTypeInfo(returnType)
-    val boxedFieldExprs = fieldExprs.map(generateOutputFieldBoxing)
-
-    // generate result expression
-    returnType match {
-      case ri: RowTypeInfo =>
-        addReusableOutRecord(ri)
-        val resultSetters: String = boxedFieldExprs.zipWithIndex map {
-          case (fieldExpr, i) =>
-            if (nullCheck) {
-              s"""
-              |${fieldExpr.code}
-              |if (${fieldExpr.nullTerm}) {
-              |  $outRecordTerm.setField($i, null);
-              |}
-              |else {
-              |  $outRecordTerm.setField($i, ${fieldExpr.resultTerm});
-              |}
-              |""".stripMargin
-            }
-            else {
-              s"""
-              |${fieldExpr.code}
-              |$outRecordTerm.setField($i, ${fieldExpr.resultTerm});
-              |""".stripMargin
-            }
-        } mkString "\n"
-
-        GeneratedExpression(outRecordTerm, "false", resultSetters, returnType)
-
-      case pt: PojoTypeInfo[_] =>
-        addReusableOutRecord(pt)
-        val resultSetters: String = boxedFieldExprs.zip(resultFieldNames) map {
-          case (fieldExpr, fieldName) =>
-            val accessor = getFieldAccessor(pt.getTypeClass, fieldName)
-
-            accessor match {
-              // Reflective access of primitives/Objects
-              case ObjectPrivateFieldAccessor(field) =>
-                val fieldTerm = addReusablePrivateFieldAccess(pt.getTypeClass, fieldName)
-
-                val defaultIfNull = if (isFieldPrimitive(field)) {
-                  primitiveDefaultValue(fieldExpr.resultType)
-                } else {
-                  "null"
-                }
-
-                if (nullCheck) {
-                  s"""
-                    |${fieldExpr.code}
-                    |if (${fieldExpr.nullTerm}) {
-                    |  ${reflectiveFieldWriteAccess(
-                          fieldTerm,
-                          field,
-                          outRecordTerm,
-                          defaultIfNull)};
-                    |}
-                    |else {
-                    |  ${reflectiveFieldWriteAccess(
-                          fieldTerm,
-                          field,
-                          outRecordTerm,
-                          fieldExpr.resultTerm)};
-                    |}
-                    |""".stripMargin
-                }
-                else {
-                  s"""
-                    |${fieldExpr.code}
-                    |${reflectiveFieldWriteAccess(
-                          fieldTerm,
-                          field,
-                          outRecordTerm,
-                          fieldExpr.resultTerm)};
-                    |""".stripMargin
-                }
-
-              // primitive or Object field access (implicit boxing)
-              case _ =>
-                if (nullCheck) {
-                  s"""
-                    |${fieldExpr.code}
-                    |if (${fieldExpr.nullTerm}) {
-                    |  $outRecordTerm.$fieldName = null;
-                    |}
-                    |else {
-                    |  $outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
-                    |}
-                    |""".stripMargin
-                }
-                else {
-                  s"""
-                    |${fieldExpr.code}
-                    |$outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
-                    |""".stripMargin
-                }
-              }
-          } mkString "\n"
-
-        GeneratedExpression(outRecordTerm, "false", resultSetters, returnType)
-
-      case tup: TupleTypeInfo[_] =>
-        addReusableOutRecord(tup)
-        val resultSetters: String = boxedFieldExprs.zipWithIndex map {
-          case (fieldExpr, i) =>
-            val fieldName = "f" + i
-            if (nullCheck) {
-              s"""
-                |${fieldExpr.code}
-                |if (${fieldExpr.nullTerm}) {
-                |  throw new NullPointerException("Null result cannot be stored in a Tuple.");
-                |}
-                |else {
-                |  $outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
-                |}
-                |""".stripMargin
-            }
-            else {
-              s"""
-                |${fieldExpr.code}
-                |$outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
-                |""".stripMargin
-            }
-        } mkString "\n"
-
-        GeneratedExpression(outRecordTerm, "false", resultSetters, returnType)
-
-      case cc: CaseClassTypeInfo[_] =>
-        val fieldCodes: String = boxedFieldExprs.map(_.code).mkString("\n")
-        val constructorParams: String = boxedFieldExprs.map(_.resultTerm).mkString(", ")
-        val resultTerm = newName(outRecordTerm)
-
-        val nullCheckCode = if (nullCheck) {
-        boxedFieldExprs map { (fieldExpr) =>
-          s"""
-              |if (${fieldExpr.nullTerm}) {
-              |  throw new NullPointerException("Null result cannot be stored in a Case Class.");
-              |}
-              |""".stripMargin
-          } mkString "\n"
-        } else {
-          ""
-        }
-
-        val resultCode =
-          s"""
-            |$fieldCodes
-            |$nullCheckCode
-            |$returnTypeTerm $resultTerm = new $returnTypeTerm($constructorParams);
-            |""".stripMargin
-
-        GeneratedExpression(resultTerm, "false", resultCode, returnType)
-
-      case a: AtomicType[_] =>
-        val fieldExpr = boxedFieldExprs.head
-        val nullCheckCode = if (nullCheck) {
-          s"""
-          |if (${fieldExpr.nullTerm}) {
-          |  throw new NullPointerException("Null result cannot be used for atomic types.");
-          |}
-          |""".stripMargin
-        } else {
-          ""
-        }
-        val resultCode =
-          s"""
-            |${fieldExpr.code}
-            |$nullCheckCode
-            |""".stripMargin
-
-        GeneratedExpression(fieldExpr.resultTerm, "false", resultCode, returnType)
-
-      case _ =>
-        throw new CodeGenException(s"Unsupported result type: $returnType")
-    }
-  }
-
-  // ----------------------------------------------------------------------------------------------
-  // RexVisitor methods
-  // ----------------------------------------------------------------------------------------------
-
-  override def visitInputRef(inputRef: RexInputRef): GeneratedExpression = {
-    // if inputRef index is within size of input1 we work with input1, input2 otherwise
-    val input = if (inputRef.getIndex < input1.getArity) {
-      (input1, input1Term, input1PojoFieldMapping)
-    } else {
-      (input2.getOrElse(throw new CodeGenException("Invalid input access.")),
-        input2Term,
-        input2PojoFieldMapping)
-    }
-
-    val index = if (input._2 == input1Term) {
-      inputRef.getIndex
-    } else {
-      inputRef.getIndex - input1.getArity
-    }
-
-    generateInputAccess(input._1, input._2, index, input._3)
-  }
-
-  override def visitFieldAccess(rexFieldAccess: RexFieldAccess): GeneratedExpression = {
-    val refExpr = rexFieldAccess.getReferenceExpr.accept(this)
-    val index = rexFieldAccess.getField.getIndex
-    val fieldAccessExpr = generateFieldAccess(
-      refExpr.resultType,
-      refExpr.resultTerm,
-      index,
-      input1PojoFieldMapping)
-
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(fieldAccessExpr.resultType)
-    val defaultValue = primitiveDefaultValue(fieldAccessExpr.resultType)
-    val resultCode = if (nullCheck) {
-      s"""
-        |${refExpr.code}
-        |$resultTypeTerm $resultTerm;
-        |boolean $nullTerm;
-        |if (${refExpr.nullTerm}) {
-        |  $resultTerm = $defaultValue;
-        |  $nullTerm = true;
-        |}
-        |else {
-        |  ${fieldAccessExpr.code}
-        |  $resultTerm = ${fieldAccessExpr.resultTerm};
-        |  $nullTerm = ${fieldAccessExpr.nullTerm};
-        |}
-        |""".stripMargin
-    } else {
-      s"""
-        |${refExpr.code}
-        |${fieldAccessExpr.code}
-        |$resultTypeTerm $resultTerm = ${fieldAccessExpr.resultTerm};
-        |""".stripMargin
-    }
-
-    GeneratedExpression(resultTerm, nullTerm, resultCode, fieldAccessExpr.resultType)
-  }
-
-  override def visitLiteral(literal: RexLiteral): GeneratedExpression = {
-    val resultType = FlinkTypeFactory.toTypeInfo(literal.getType)
-    val value = literal.getValue3
-    // null value with type
-    if (value == null) {
-      return generateNullLiteral(resultType)
-    }
-    // non-null values
-    literal.getType.getSqlTypeName match {
-
-      case BOOLEAN =>
-        generateNonNullLiteral(resultType, literal.getValue3.toString)
-
-      case TINYINT =>
-        val decimal = BigDecimal(value.asInstanceOf[JBigDecimal])
-        if (decimal.isValidByte) {
-          generateNonNullLiteral(resultType, decimal.byteValue().toString)
-        }
-        else {
-          throw new CodeGenException("Decimal can not be converted to byte.")
-        }
-
-      case SMALLINT =>
-        val decimal = BigDecimal(value.asInstanceOf[JBigDecimal])
-        if (decimal.isValidShort) {
-          generateNonNullLiteral(resultType, decimal.shortValue().toString)
-        }
-        else {
-          throw new CodeGenException("Decimal can not be converted to short.")
-        }
-
-      case INTEGER =>
-        val decimal = BigDecimal(value.asInstanceOf[JBigDecimal])
-        if (decimal.isValidInt) {
-          generateNonNullLiteral(resultType, decimal.intValue().toString)
-        }
-        else {
-          throw new CodeGenException("Decimal can not be converted to integer.")
-        }
-
-      case BIGINT =>
-        val decimal = BigDecimal(value.asInstanceOf[JBigDecimal])
-        if (decimal.isValidLong) {
-          generateNonNullLiteral(resultType, decimal.longValue().toString + "L")
-        }
-        else {
-          throw new CodeGenException("Decimal can not be converted to long.")
-        }
-
-      case FLOAT =>
-        val floatValue = value.asInstanceOf[JBigDecimal].floatValue()
-        floatValue match {
-          case Float.NaN => generateNonNullLiteral(resultType, "java.lang.Float.NaN")
-          case Float.NegativeInfinity =>
-            generateNonNullLiteral(resultType, "java.lang.Float.NEGATIVE_INFINITY")
-          case Float.PositiveInfinity =>
-            generateNonNullLiteral(resultType, "java.lang.Float.POSITIVE_INFINITY")
-          case _ => generateNonNullLiteral(resultType, floatValue.toString + "f")
-        }
-
-      case DOUBLE =>
-        val doubleValue = value.asInstanceOf[JBigDecimal].doubleValue()
-        doubleValue match {
-          case Double.NaN => generateNonNullLiteral(resultType, "java.lang.Double.NaN")
-          case Double.NegativeInfinity =>
-            generateNonNullLiteral(resultType, "java.lang.Double.NEGATIVE_INFINITY")
-          case Double.PositiveInfinity =>
-            generateNonNullLiteral(resultType, "java.lang.Double.POSITIVE_INFINITY")
-          case _ => generateNonNullLiteral(resultType, doubleValue.toString + "d")
-        }
-      case DECIMAL =>
-        val decimalField = addReusableDecimal(value.asInstanceOf[JBigDecimal])
-        generateNonNullLiteral(resultType, decimalField)
-
-      case VARCHAR | CHAR =>
-        generateNonNullLiteral(resultType, "\"" + value.toString + "\"")
-
-      case SYMBOL =>
-        generateSymbol(value.asInstanceOf[Enum[_]])
-
-      case DATE =>
-        generateNonNullLiteral(resultType, value.toString)
-
-      case TIME =>
-        generateNonNullLiteral(resultType, value.toString)
-
-      case TIMESTAMP =>
-        generateNonNullLiteral(resultType, value.toString + "L")
-
-      case typeName if YEAR_INTERVAL_TYPES.contains(typeName) =>
-        val decimal = BigDecimal(value.asInstanceOf[JBigDecimal])
-        if (decimal.isValidInt) {
-          generateNonNullLiteral(resultType, decimal.intValue().toString)
-        } else {
-          throw new CodeGenException("Decimal can not be converted to interval of months.")
-        }
-
-      case typeName if DAY_INTERVAL_TYPES.contains(typeName) =>
-        val decimal = BigDecimal(value.asInstanceOf[JBigDecimal])
-        if (decimal.isValidLong) {
-          generateNonNullLiteral(resultType, decimal.longValue().toString + "L")
-        } else {
-          throw new CodeGenException("Decimal can not be converted to interval of milliseconds.")
-        }
-
-      case t@_ =>
-        throw new CodeGenException(s"Type not supported: $t")
-    }
-  }
-
-  override def visitCorrelVariable(correlVariable: RexCorrelVariable): GeneratedExpression = {
-    GeneratedExpression(input1Term, NEVER_NULL, NO_CODE, input1)
-  }
-
-  override def visitLocalRef(localRef: RexLocalRef): GeneratedExpression =
-    throw new CodeGenException("Local variables are not supported yet.")
-
-  override def visitRangeRef(rangeRef: RexRangeRef): GeneratedExpression =
-    throw new CodeGenException("Range references are not supported yet.")
-
-  override def visitDynamicParam(dynamicParam: RexDynamicParam): GeneratedExpression =
-    throw new CodeGenException("Dynamic parameter references are not supported yet.")
-
-  override def visitCall(call: RexCall): GeneratedExpression = {
-    val operands = call.getOperands.map(_.accept(this))
-    val resultType = FlinkTypeFactory.toTypeInfo(call.getType)
-
-    call.getOperator match {
-      // arithmetic
-      case PLUS if isNumeric(resultType) =>
-        val left = operands.head
-        val right = operands(1)
-        requireNumeric(left)
-        requireNumeric(right)
-        generateArithmeticOperator("+", nullCheck, resultType, left, right)
-
-      case PLUS | DATETIME_PLUS if isTemporal(resultType) =>
-        val left = operands.head
-        val right = operands(1)
-        requireTemporal(left)
-        requireTemporal(right)
-        generateTemporalPlusMinus(plus = true, nullCheck, left, right)
-
-      case MINUS if isNumeric(resultType) =>
-        val left = operands.head
-        val right = operands(1)
-        requireNumeric(left)
-        requireNumeric(right)
-        generateArithmeticOperator("-", nullCheck, resultType, left, right)
-
-      case MINUS | MINUS_DATE if isTemporal(resultType) =>
-        val left = operands.head
-        val right = operands(1)
-        requireTemporal(left)
-        requireTemporal(right)
-        generateTemporalPlusMinus(plus = false, nullCheck, left, right)
-
-      case MULTIPLY if isNumeric(resultType) =>
-        val left = operands.head
-        val right = operands(1)
-        requireNumeric(left)
-        requireNumeric(right)
-        generateArithmeticOperator("*", nullCheck, resultType, left, right)
-
-      case DIVIDE | DIVIDE_INTEGER if isNumeric(resultType) =>
-        val left = operands.head
-        val right = operands(1)
-        requireNumeric(left)
-        requireNumeric(right)
-        generateArithmeticOperator("/", nullCheck, resultType, left, right)
-
-      case MOD if isNumeric(resultType) =>
-        val left = operands.head
-        val right = operands(1)
-        requireNumeric(left)
-        requireNumeric(right)
-        generateArithmeticOperator("%", nullCheck, resultType, left, right)
-
-      case UNARY_MINUS if isNumeric(resultType) =>
-        val operand = operands.head
-        requireNumeric(operand)
-        generateUnaryArithmeticOperator("-", nullCheck, resultType, operand)
-
-      case UNARY_MINUS if isTimeInterval(resultType) =>
-        val operand = operands.head
-        requireTimeInterval(operand)
-        generateUnaryIntervalPlusMinus(plus = false, nullCheck, operand)
-
-      case UNARY_PLUS if isNumeric(resultType) =>
-        val operand = operands.head
-        requireNumeric(operand)
-        generateUnaryArithmeticOperator("+", nullCheck, resultType, operand)
-
-      case UNARY_PLUS if isTimeInterval(resultType) =>
-        val operand = operands.head
-        requireTimeInterval(operand)
-        generateUnaryIntervalPlusMinus(plus = true, nullCheck, operand)
-
-      // comparison
-      case EQUALS =>
-        val left = operands.head
-        val right = operands(1)
-        generateEquals(nullCheck, left, right)
-
-      case NOT_EQUALS =>
-        val left = operands.head
-        val right = operands(1)
-        generateNotEquals(nullCheck, left, right)
-
-      case GREATER_THAN =>
-        val left = operands.head
-        val right = operands(1)
-        requireComparable(left)
-        requireComparable(right)
-        generateComparison(">", nullCheck, left, right)
-
-      case GREATER_THAN_OR_EQUAL =>
-        val left = operands.head
-        val right = operands(1)
-        requireComparable(left)
-        requireComparable(right)
-        generateComparison(">=", nullCheck, left, right)
-
-      case LESS_THAN =>
-        val left = operands.head
-        val right = operands(1)
-        requireComparable(left)
-        requireComparable(right)
-        generateComparison("<", nullCheck, left, right)
-
-      case LESS_THAN_OR_EQUAL =>
-        val left = operands.head
-        val right = operands(1)
-        requireComparable(left)
-        requireComparable(right)
-        generateComparison("<=", nullCheck, left, right)
-
-      case IS_NULL =>
-        val operand = operands.head
-        generateIsNull(nullCheck, operand)
-
-      case IS_NOT_NULL =>
-        val operand = operands.head
-        generateIsNotNull(nullCheck, operand)
-
-      // logic
-      case AND =>
-        operands.reduceLeft { (left: GeneratedExpression, right: GeneratedExpression) =>
-          requireBoolean(left)
-          requireBoolean(right)
-          generateAnd(nullCheck, left, right)
-        }
-
-      case OR =>
-        operands.reduceLeft { (left: GeneratedExpression, right: GeneratedExpression) =>
-          requireBoolean(left)
-          requireBoolean(right)
-          generateOr(nullCheck, left, right)
-        }
-
-      case NOT =>
-        val operand = operands.head
-        requireBoolean(operand)
-        generateNot(nullCheck, operand)
-
-      case CASE =>
-        generateIfElse(nullCheck, operands, resultType)
-
-      case IS_TRUE =>
-        val operand = operands.head
-        requireBoolean(operand)
-        generateIsTrue(operand)
-
-      case IS_NOT_TRUE =>
-        val operand = operands.head
-        requireBoolean(operand)
-        generateIsNotTrue(operand)
-
-      case IS_FALSE =>
-        val operand = operands.head
-        requireBoolean(operand)
-        generateIsFalse(operand)
-
-      case IS_NOT_FALSE =>
-        val operand = operands.head
-        requireBoolean(operand)
-        generateIsNotFalse(operand)
-
-      // casting
-      case CAST | REINTERPRET =>
-        val operand = operands.head
-        generateCast(nullCheck, operand, resultType)
-
-      // as / renaming
-      case AS =>
-        operands.head
-
-      // string arithmetic
-      case CONCAT =>
-        val left = operands.head
-        val right = operands(1)
-        requireString(left)
-        generateArithmeticOperator("+", nullCheck, resultType, left, right)
-
-      // arrays
-      case ARRAY_VALUE_CONSTRUCTOR =>
-        generateArray(this, resultType, operands)
-
-      case ITEM =>
-        val array = operands.head
-        val index = operands(1)
-        requireArray(array)
-        requireInteger(index)
-        generateArrayElementAt(this, array, index)
-
-      case CARDINALITY =>
-        val array = operands.head
-        requireArray(array)
-        generateArrayCardinality(nullCheck, array)
-
-      case ELEMENT =>
-        val array = operands.head
-        requireArray(array)
-        generateArrayElement(this, array)
-
-      // advanced scalar functions
-      case sqlOperator: SqlOperator =>
-        val callGen = FunctionGenerator.getCallGenerator(
-          sqlOperator,
-          operands.map(_.resultType),
-          resultType)
-        callGen
-          .getOrElse(throw new CodeGenException(s"Unsupported call: $sqlOperator \n" +
-            s"If you think this function should be supported, " +
-            s"you can create an issue and start a discussion for it."))
-          .generate(this, operands)
-
-      // unknown or invalid
-      case call@_ =>
-        throw new CodeGenException(s"Unsupported call: $call")
-    }
-  }
-
-  override def visitOver(over: RexOver): GeneratedExpression =
-    throw new CodeGenException("Aggregate functions over windows are not supported yet.")
-
-  override def visitSubQuery(subQuery: RexSubQuery): GeneratedExpression =
-    throw new CodeGenException("Subqueries are not supported yet.")
-
-  // ----------------------------------------------------------------------------------------------
-  // generator helping methods
-  // ----------------------------------------------------------------------------------------------
-
-  private def generateInputAccess(
-      inputType: TypeInformation[Any],
-      inputTerm: String,
-      index: Int,
-      pojoFieldMapping: Option[Array[Int]])
-    : GeneratedExpression = {
-    // if input has been used before, we can reuse the code that
-    // has already been generated
-    val inputExpr = reusableInputUnboxingExprs.get((inputTerm, index)) match {
-      // input access and unboxing has already been generated
-      case Some(expr) =>
-        expr
-
-      // generate input access and unboxing if necessary
-      case None =>
-        val expr = if (nullableInput) {
-          generateNullableInputFieldAccess(inputType, inputTerm, index, pojoFieldMapping)
-        } else {
-          generateFieldAccess(inputType, inputTerm, index, pojoFieldMapping)
-        }
-
-        reusableInputUnboxingExprs((inputTerm, index)) = expr
-        expr
-    }
-    // hide the generated code as it will be executed only once
-    GeneratedExpression(inputExpr.resultTerm, inputExpr.nullTerm, "", inputExpr.resultType)
-  }
-
-  private def generateNullableInputFieldAccess(
-      inputType: TypeInformation[Any],
-      inputTerm: String,
-      index: Int,
-      pojoFieldMapping: Option[Array[Int]])
-    : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-
-    val fieldType = inputType match {
-      case ct: CompositeType[_] =>
-        val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo[_]]) {
-          pojoFieldMapping.get(index)
-        }
-        else {
-          index
-        }
-        ct.getTypeAt(fieldIndex)
-      case at: AtomicType[_] => at
-      case _ => throw new CodeGenException("Unsupported type for input field access.")
-    }
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(fieldType)
-    val defaultValue = primitiveDefaultValue(fieldType)
-    val fieldAccessExpr = generateFieldAccess(inputType, inputTerm, index, pojoFieldMapping)
-
-    val inputCheckCode =
-      s"""
-        |$resultTypeTerm $resultTerm;
-        |boolean $nullTerm;
-        |if ($inputTerm == null) {
-        |  $resultTerm = $defaultValue;
-        |  $nullTerm = true;
-        |}
-        |else {
-        |  ${fieldAccessExpr.code}
-        |  $resultTerm = ${fieldAccessExpr.resultTerm};
-        |  $nullTerm = ${fieldAccessExpr.nullTerm};
-        |}
-        |""".stripMargin
-
-    GeneratedExpression(resultTerm, nullTerm, inputCheckCode, fieldType)
-  }
-
-  private def generateFieldAccess(
-      inputType: TypeInformation[_],
-      inputTerm: String,
-      index: Int,
-      pojoFieldMapping: Option[Array[Int]])
-    : GeneratedExpression = {
-    inputType match {
-      case ct: CompositeType[_] =>
-        val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo[_]] && pojoFieldMapping.nonEmpty) {
-          pojoFieldMapping.get(index)
-        }
-        else {
-          index
-        }
-        val accessor = fieldAccessorFor(ct, fieldIndex)
-        val fieldType: TypeInformation[Any] = ct.getTypeAt(fieldIndex)
-        val fieldTypeTerm = boxedTypeTermForTypeInfo(fieldType)
-
-        accessor match {
-          case ObjectFieldAccessor(field) =>
-            // primitive
-            if (isFieldPrimitive(field)) {
-              generateNonNullLiteral(fieldType, s"$inputTerm.${field.getName}")
-            }
-            // Object
-            else {
-              generateInputFieldUnboxing(
-                fieldType,
-                s"($fieldTypeTerm) $inputTerm.${field.getName}")
-            }
-
-          case ObjectGenericFieldAccessor(fieldName) =>
-            // Object
-            val inputCode = s"($fieldTypeTerm) $inputTerm.$fieldName"
-            generateInputFieldUnboxing(fieldType, inputCode)
-
-          case ObjectMethodAccessor(methodName) =>
-            // Object
-            val inputCode = s"($fieldTypeTerm) $inputTerm.$methodName()"
-            generateInputFieldUnboxing(fieldType, inputCode)
-
-          case ProductAccessor(i) =>
-            // Object
-            val inputCode = s"($fieldTypeTerm) $inputTerm.getField($i)"
-            generateInputFieldUnboxing(fieldType, inputCode)
-
-          case ObjectPrivateFieldAccessor(field) =>
-            val fieldTerm = addReusablePrivateFieldAccess(ct.getTypeClass, field.getName)
-            val reflectiveAccessCode = reflectiveFieldReadAccess(fieldTerm, field, inputTerm)
-            // primitive
-            if (isFieldPrimitive(field)) {
-              generateNonNullLiteral(fieldType, reflectiveAccessCode)
-            }
-            // Object
-            else {
-              generateInputFieldUnboxing(fieldType, reflectiveAccessCode)
-            }
-        }
-
-      case at: AtomicType[_] =>
-        val fieldTypeTerm = boxedTypeTermForTypeInfo(at)
-        val inputCode = s"($fieldTypeTerm) $inputTerm"
-        generateInputFieldUnboxing(at, inputCode)
-
-      case _ =>
-        throw new CodeGenException("Unsupported type for input field access.")
-    }
-  }
-
-  private def generateNullLiteral(resultType: TypeInformation[_]): GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
-    val defaultValue = primitiveDefaultValue(resultType)
-
-    if (nullCheck) {
-      val wrappedCode = s"""
-        |$resultTypeTerm $resultTerm = $defaultValue;
-        |boolean $nullTerm = true;
-        |""".stripMargin
-      GeneratedExpression(resultTerm, nullTerm, wrappedCode, resultType)
-    } else {
-      throw new CodeGenException("Null literals are not allowed if nullCheck is disabled.")
-    }
-  }
-
-  private[flink] def generateNonNullLiteral(
-      literalType: TypeInformation[_],
-      literalCode: String)
-    : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(literalType)
-
-    val resultCode = if (nullCheck) {
-      s"""
-        |$resultTypeTerm $resultTerm = $literalCode;
-        |boolean $nullTerm = false;
-        |""".stripMargin
-    } else {
-      s"""
-        |$resultTypeTerm $resultTerm = $literalCode;
-        |""".stripMargin
-    }
-
-    GeneratedExpression(resultTerm, nullTerm, resultCode, literalType)
-  }
-
-  private[flink] def generateSymbol(enum: Enum[_]): GeneratedExpression = {
-    GeneratedExpression(
-      qualifyEnum(enum),
-      "false",
-      "",
-      new GenericTypeInfo(enum.getDeclaringClass))
-  }
-
-  /**
-    * Converts the external boxed format to an internal mostly primitive field representation.
-    * Wrapper types can autoboxed to their corresponding primitive type (Integer -> int). External
-    * objects are converted to their internal representation (Timestamp -> internal timestamp
-    * in long).
-    *
-    * @param fieldType type of field
-    * @param fieldTerm expression term of field to be unboxed
-    * @return internal unboxed field representation
-    */
-  private[flink] def generateInputFieldUnboxing(
-      fieldType: TypeInformation[_],
-      fieldTerm: String)
-    : GeneratedExpression = {
-    val tmpTerm = newName("tmp")
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-    val tmpTypeTerm = boxedTypeTermForTypeInfo(fieldType)
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(fieldType)
-    val defaultValue = primitiveDefaultValue(fieldType)
-
-    // explicit unboxing
-    val unboxedFieldCode = if (isTimePoint(fieldType)) {
-      timePointToInternalCode(fieldType, fieldTerm)
-    } else {
-      fieldTerm
-    }
-
-    val wrappedCode = if (nullCheck && !isReference(fieldType)) {
-      s"""
-        |$tmpTypeTerm $tmpTerm = $unboxedFieldCode;
-        |boolean $nullTerm = $tmpTerm == null;
-        |$resultTypeTerm $resultTerm;
-        |if ($nullTerm) {
-        |  $resultTerm = $defaultValue;
-        |}
-        |else {
-        |  $resultTerm = $tmpTerm;
-        |}
-        |""".stripMargin
-    } else if (nullCheck) {
-      s"""
-        |$resultTypeTerm $resultTerm = $unboxedFieldCode;
-        |boolean $nullTerm = $fieldTerm == null;
-        |""".stripMargin
-    } else {
-      s"""
-        |$resultTypeTerm $resultTerm = $unboxedFieldCode;
-        |""".stripMargin
-    }
-
-    GeneratedExpression(resultTerm, nullTerm, wrappedCode, fieldType)
-  }
-
-  /**
-    * Converts the internal mostly primitive field representation to an external boxed format.
-    * Primitive types can autoboxed to their corresponding object type (int -> Integer). Internal
-    * representations are converted to their external objects (internal timestamp
-    * in long -> Timestamp).
-    *
-    * @param expr expression to be boxed
-    * @return external boxed field representation
-    */
-  private[flink] def generateOutputFieldBoxing(expr: GeneratedExpression): GeneratedExpression = {
-    expr.resultType match {
-      // convert internal date/time/timestamp to java.sql.* objects
-      case SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP =>
-        val resultTerm = newName("result")
-        val resultTypeTerm = boxedTypeTermForTypeInfo(expr.resultType)
-        val convMethod = internalToTimePointCode(expr.resultType, expr.resultTerm)
-
-        val resultCode = if (nullCheck) {
-          s"""
-            |${expr.code}
-            |$resultTypeTerm $resultTerm;
-            |if (${expr.nullTerm}) {
-            |  $resultTerm = null;
-            |}
-            |else {
-            |  $resultTerm = $convMethod;
-            |}
-            |""".stripMargin
-        } else {
-          s"""
-            |${expr.code}
-            |$resultTypeTerm $resultTerm = $convMethod;
-            |""".stripMargin
-        }
-
-        GeneratedExpression(resultTerm, expr.nullTerm, resultCode, expr.resultType)
-
-      // other types are autoboxed or need no boxing
-      case _ => expr
-    }
-  }
-
-  // ----------------------------------------------------------------------------------------------
-  // Reusable code snippets
-  // ----------------------------------------------------------------------------------------------
-
-  /**
-    * Adds a reusable output record to the member area of the generated [[Function]].
-    * The passed [[TypeInformation]] defines the type class to be instantiated.
-    *
-    * @param ti type information of type class to be instantiated during runtime
-    * @return member variable term
-    */
-  def addReusableOutRecord(ti: TypeInformation[_]): Unit = {
-    val statement = ti match {
-      case rt: RowTypeInfo =>
-        s"""
-          |transient ${ti.getTypeClass.getCanonicalName} $outRecordTerm =
-          |    new ${ti.getTypeClass.getCanonicalName}(${rt.getArity});
-          |""".stripMargin
-      case _ =>
-        s"""
-          |${ti.getTypeClass.getCanonicalName} $outRecordTerm =
-          |    new ${ti.getTypeClass.getCanonicalName}();
-          |""".stripMargin
-    }
-    reusableMemberStatements.add(statement)
-  }
-
-  /**
-    * Adds a reusable [[java.lang.reflect.Field]] to the member area of the generated [[Function]].
-    * The field can be used for accessing POJO fields more efficiently during runtime, however,
-    * the field does not have to be public.
-    *
-    * @param clazz class of containing field
-    * @param fieldName name of field to be extracted and instantiated during runtime
-    * @return member variable term
-    */
-  def addReusablePrivateFieldAccess(clazz: Class[_], fieldName: String): String = {
-    val fieldTerm = s"field_${clazz.getCanonicalName.replace('.', '$')}_$fieldName"
-    val fieldExtraction =
-      s"""
-        |transient java.lang.reflect.Field $fieldTerm =
-        |    org.apache.flink.api.java.typeutils.TypeExtractor.getDeclaredField(
-        |      ${clazz.getCanonicalName}.class, "$fieldName");
-        |""".stripMargin
-    reusableMemberStatements.add(fieldExtraction)
-
-    val fieldAccessibility =
-      s"""
-        |$fieldTerm.setAccessible(true);
-        |""".stripMargin
-    reusableInitStatements.add(fieldAccessibility)
-
-    fieldTerm
-  }
-
-  /**
-    * Adds a reusable [[java.math.BigDecimal]] to the member area of the generated [[Function]].
-    *
-    * @param decimal decimal object to be instantiated during runtime
-    * @return member variable term
-    */
-  def addReusableDecimal(decimal: JBigDecimal): String = decimal match {
-    case JBigDecimal.ZERO => "java.math.BigDecimal.ZERO"
-    case JBigDecimal.ONE => "java.math.BigDecimal.ONE"
-    case JBigDecimal.TEN => "java.math.BigDecimal.TEN"
-    case _ =>
-      val fieldTerm = newName("decimal")
-      val fieldDecimal =
-        s"""
-          |transient java.math.BigDecimal $fieldTerm =
-          |    new java.math.BigDecimal("${decimal.toString}");
-          |""".stripMargin
-      reusableMemberStatements.add(fieldDecimal)
-      fieldTerm
-  }
-
-  /**
-    * Adds a reusable [[UserDefinedFunction]] to the member area of the generated [[Function]].
-    * The [[UserDefinedFunction]] must have a default constructor, however, it does not have
-    * to be public.
-    *
-    * @param function [[UserDefinedFunction]] object to be instantiated during runtime
-    * @return member variable term
-    */
-  def addReusableFunction(function: UserDefinedFunction): String = {
-    val classQualifier = function.getClass.getCanonicalName
-    val fieldTerm = s"function_${classQualifier.replace('.', '$')}"
-
-    val fieldFunction =
-      s"""
-        |transient $classQualifier $fieldTerm = null;
-        |""".stripMargin
-    reusableMemberStatements.add(fieldFunction)
-
-    val constructorTerm = s"constructor_${classQualifier.replace('.', '$')}"
-    val constructorAccessibility =
-      s"""
-        |java.lang.reflect.Constructor $constructorTerm =
-        |  $classQualifier.class.getDeclaredConstructor();
-        |$constructorTerm.setAccessible(true);
-        |$fieldTerm = ($classQualifier) $constructorTerm.newInstance();
-       """.stripMargin
-    reusableInitStatements.add(constructorAccessibility)
-    fieldTerm
-  }
-
-  /**
-    * Adds a reusable array to the member area of the generated [[Function]].
-    */
-  def addReusableArray(clazz: Class[_], size: Int): String = {
-    val fieldTerm = newName("array")
-    val classQualifier = clazz.getCanonicalName // works also for int[] etc.
-    val initArray = classQualifier.replaceFirst("\\[", s"[$size")
-    val fieldArray =
-      s"""
-        |transient $classQualifier $fieldTerm =
-        |    new $initArray;
-        |""".stripMargin
-    reusableMemberStatements.add(fieldArray)
-    fieldTerm
-  }
-
-  /**
-    * Adds a reusable timestamp to the beginning of the SAM of the generated [[Function]].
-    */
-  def addReusableTimestamp(): String = {
-    val fieldTerm = s"timestamp"
-
-    val field =
-      s"""
-        |final long $fieldTerm = java.lang.System.currentTimeMillis();
-        |""".stripMargin
-    reusablePerRecordStatements.add(field)
-    fieldTerm
-  }
-
-    /**
-    * Adds a reusable local timestamp to the beginning of the SAM of the generated [[Function]].
-    */
-  def addReusableLocalTimestamp(): String = {
-    val fieldTerm = s"localtimestamp"
-
-    val timestamp = addReusableTimestamp()
-
-    val field =
-      s"""
-        |final long $fieldTerm = $timestamp + java.util.TimeZone.getDefault().getOffset(timestamp);
-        |""".stripMargin
-    reusablePerRecordStatements.add(field)
-    fieldTerm
-  }
-
-  /**
-    * Adds a reusable time to the beginning of the SAM of the generated [[Function]].
-    */
-  def addReusableTime(): String = {
-    val fieldTerm = s"time"
-
-    val timestamp = addReusableTimestamp()
-
-    // adopted from org.apache.calcite.runtime.SqlFunctions.currentTime()
-    val field =
-      s"""
-        |final int $fieldTerm = (int) ($timestamp % ${DateTimeUtils.MILLIS_PER_DAY});
-        |if (time < 0) {
-        |  time += ${DateTimeUtils.MILLIS_PER_DAY};
-        |}
-        |""".stripMargin
-    reusablePerRecordStatements.add(field)
-    fieldTerm
-  }
-
-  /**
-    * Adds a reusable local time to the beginning of the SAM of the generated [[Function]].
-    */
-  def addReusableLocalTime(): String = {
-    val fieldTerm = s"localtime"
-
-    val localtimestamp = addReusableLocalTimestamp()
-
-    // adopted from org.apache.calcite.runtime.SqlFunctions.localTime()
-    val field =
-      s"""
-        |final int $fieldTerm = (int) ($localtimestamp % ${DateTimeUtils.MILLIS_PER_DAY});
-        |""".stripMargin
-    reusablePerRecordStatements.add(field)
-    fieldTerm
-  }
-
-
-  /**
-    * Adds a reusable date to the beginning of the SAM of the generated [[Function]].
-    */
-  def addReusableDate(): String = {
-    val fieldTerm = s"date"
-
-    val timestamp = addReusableTimestamp()
-    val time = addReusableTime()
-
-    // adopted from org.apache.calcite.runtime.SqlFunctions.currentDate()
-    val field =
-      s"""
-        |final int $fieldTerm = (int) ($timestamp / ${DateTimeUtils.MILLIS_PER_DAY});
-        |if ($time < 0) {
-        |  $fieldTerm -= 1;
-        |}
-        |""".stripMargin
-    reusablePerRecordStatements.add(field)
-    fieldTerm
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala
deleted file mode 100644
index fce13ba..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Compiler.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen
-
-import org.apache.flink.api.common.InvalidProgramException
-import org.codehaus.commons.compiler.CompileException
-import org.codehaus.janino.SimpleCompiler
-
-trait Compiler[T] {
-
-  @throws(classOf[CompileException])
-  def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
-    require(cl != null, "Classloader must not be null.")
-    val compiler = new SimpleCompiler()
-    compiler.setParentClassLoader(cl)
-    try {
-      compiler.cook(code)
-    } catch {
-      case e: CompileException =>
-        throw new InvalidProgramException("Table program cannot be compiled. " +
-          "This is a bug. Please file an issue.", e)
-    }
-    compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
deleted file mode 100644
index 871264e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen
-
-import java.util
-
-import org.apache.calcite.plan.RelOptPlanner
-import org.apache.calcite.rex.{RexBuilder, RexNode}
-import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.table.typeutils.TypeConverter
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.types.Row
-import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
-
-import scala.collection.JavaConverters._
-
-/**
-  * Evaluates constant expressions using Flink's [[CodeGenerator]].
-  */
-class ExpressionReducer(config: TableConfig)
-  extends RelOptPlanner.Executor with Compiler[MapFunction[Row, Row]] {
-
-  private val EMPTY_ROW_INFO = TypeConverter.DEFAULT_ROW_TYPE
-  private val EMPTY_ROW = new Row(0)
-
-  override def reduce(
-    rexBuilder: RexBuilder,
-    constExprs: util.List[RexNode],
-    reducedValues: util.List[RexNode]): Unit = {
-
-    val typeFactory = rexBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-
-    val literals = constExprs.asScala.map(e => (e.getType.getSqlTypeName, e)).flatMap {
-
-      // we need to cast here for RexBuilder.makeLiteral
-      case (SqlTypeName.DATE, e) =>
-        Some(
-          rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO), e)
-        )
-      case (SqlTypeName.TIME, e) =>
-        Some(
-          rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO), e)
-        )
-      case (SqlTypeName.TIMESTAMP, e) =>
-        Some(
-          rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.LONG_TYPE_INFO), e)
-        )
-
-      // we don't support object literals yet, we skip those constant expressions
-      case (SqlTypeName.ANY, _) | (SqlTypeName.ROW, _) | (SqlTypeName.ARRAY, _) => None
-
-      case (_, e) => Some(e)
-    }
-
-    val literalTypes = literals.map(e => FlinkTypeFactory.toTypeInfo(e.getType))
-    val resultType = new RowTypeInfo(literalTypes: _*)
-
-    // generate MapFunction
-    val generator = new CodeGenerator(config, false, EMPTY_ROW_INFO)
-
-    val result = generator.generateResultExpression(
-      resultType,
-      resultType.getFieldNames,
-      literals)
-
-    val generatedFunction = generator.generateFunction[MapFunction[Row, Row]](
-      "ExpressionReducer",
-      classOf[MapFunction[Row, Row]],
-      s"""
-        |${result.code}
-        |return ${result.resultTerm};
-        |""".stripMargin,
-      resultType.asInstanceOf[TypeInformation[Any]])
-
-    val clazz = compile(getClass.getClassLoader, generatedFunction.name, generatedFunction.code)
-    val function = clazz.newInstance()
-
-    // execute
-    val reduced = function.map(EMPTY_ROW)
-
-    // add the reduced results or keep them unreduced
-    var i = 0
-    var reducedIdx = 0
-    while (i < constExprs.size()) {
-      val unreduced = constExprs.get(i)
-      unreduced.getType.getSqlTypeName match {
-        // we insert the original expression for object literals
-        case SqlTypeName.ANY | SqlTypeName.ROW | SqlTypeName.ARRAY =>
-          reducedValues.add(unreduced)
-        case _ =>
-          val literal = rexBuilder.makeLiteral(
-            reduced.getField(reducedIdx),
-            unreduced.getType,
-            true)
-          reducedValues.add(literal)
-          reducedIdx += 1
-      }
-      i += 1
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala
deleted file mode 100644
index c7d9a2e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen
-
-class IndentStringContext(sc: StringContext) {
-  def j(args: Any*): String = {
-    val sb = new StringBuilder()
-    for ((s, a) <- sc.parts zip args) {
-      sb append s
-
-      val ind = getindent(s)
-      if (ind.nonEmpty) {
-        sb append a.toString.replaceAll("\n", "\n" + ind)
-      } else {
-        sb append a.toString
-      }
-    }
-    if (sc.parts.size > args.size) {
-      sb append sc.parts.last
-    }
-
-    sb.toString()
-  }
-
-  // get white indent after the last new line, if any
-  def getindent(str: String): String = {
-    val lastnl = str.lastIndexOf("\n")
-    if (lastnl == -1) ""
-    else {
-      val ind = str.substring(lastnl + 1)
-      if (ind.trim.isEmpty) ind  // ind is all whitespace. Use this
-      else ""
-    }
-  }
-}
-
-object Indenter {
-  implicit def toISC(sc: StringContext): IndentStringContext = new IndentStringContext(sc)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/BuiltInMethods.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/BuiltInMethods.scala
deleted file mode 100644
index c7c7477..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/BuiltInMethods.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen.calls
-
-import java.math.{BigDecimal => JBigDecimal}
-
-import org.apache.calcite.linq4j.tree.Types
-import org.apache.calcite.runtime.SqlFunctions
-import org.apache.flink.api.table.functions.utils.MathFunctions
-
-object BuiltInMethods {
-  val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
-  val EXP = Types.lookupMethod(classOf[Math], "exp", classOf[Double])
-  val POWER = Types.lookupMethod(classOf[Math], "pow", classOf[Double], classOf[Double])
-  val POWER_DEC = Types.lookupMethod(
-    classOf[MathFunctions], "power", classOf[Double], classOf[JBigDecimal])
-  val LN = Types.lookupMethod(classOf[Math], "log", classOf[Double])
-  val ABS = Types.lookupMethod(classOf[SqlFunctions], "abs", classOf[Double])
-  val ABS_DEC = Types.lookupMethod(classOf[SqlFunctions], "abs", classOf[JBigDecimal])
-  val LIKE_WITH_ESCAPE = Types.lookupMethod(classOf[SqlFunctions], "like",
-    classOf[String], classOf[String], classOf[String])
-  val SIMILAR_WITH_ESCAPE = Types.lookupMethod(classOf[SqlFunctions], "similar",
-    classOf[String], classOf[String], classOf[String])
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala
deleted file mode 100644
index 96d0b74..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen.calls
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.codegen.CodeGenUtils._
-import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
-
-trait CallGenerator {
-
-  def generate(
-      codeGenerator: CodeGenerator,
-      operands: Seq[GeneratedExpression])
-    : GeneratedExpression
-
-}
-
-object CallGenerator {
-
-  def generateCallIfArgsNotNull(
-      nullCheck: Boolean,
-      returnType: TypeInformation[_],
-      operands: Seq[GeneratedExpression])
-      (call: (Seq[String]) => String)
-    : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(returnType)
-    val defaultValue = primitiveDefaultValue(returnType)
-
-    val resultCode = if (nullCheck && operands.nonEmpty) {
-      s"""
-        |${operands.map(_.code).mkString("\n")}
-        |boolean $nullTerm = ${operands.map(_.nullTerm).mkString(" || ")};
-        |$resultTypeTerm $resultTerm;
-        |if ($nullTerm) {
-        |  $resultTerm = $defaultValue;
-        |}
-        |else {
-        |  $resultTerm = ${call(operands.map(_.resultTerm))};
-        |}
-        |""".stripMargin
-    } else if (nullCheck && operands.isEmpty) {
-      s"""
-        |${operands.map(_.code).mkString("\n")}
-        |boolean $nullTerm = false;
-        |$resultTypeTerm $resultTerm = ${call(operands.map(_.resultTerm))};
-        |""".stripMargin
-    } else{
-      s"""
-        |${operands.map(_.code).mkString("\n")}
-        |$resultTypeTerm $resultTerm = ${call(operands.map(_.resultTerm))};
-        |""".stripMargin
-    }
-
-    GeneratedExpression(resultTerm, nullTerm, resultCode, returnType)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CurrentTimePointCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CurrentTimePointCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CurrentTimePointCallGen.scala
deleted file mode 100644
index 4aaa209..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CurrentTimePointCallGen.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen.calls
-
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
-
-/**
-  * Generates function call to determine current time point (as date/time/timestamp) in
-  * local timezone or not.
-  */
-class CurrentTimePointCallGen(
-    targetType: TypeInformation[_],
-    local: Boolean)
-  extends CallGenerator {
-
-  override def generate(
-      codeGenerator: CodeGenerator,
-      operands: Seq[GeneratedExpression])
-    : GeneratedExpression = targetType match {
-    case SqlTimeTypeInfo.TIME if local =>
-      val time = codeGenerator.addReusableLocalTime()
-      codeGenerator.generateNonNullLiteral(targetType, time)
-
-    case SqlTimeTypeInfo.TIMESTAMP if local =>
-      val timestamp = codeGenerator.addReusableLocalTimestamp()
-      codeGenerator.generateNonNullLiteral(targetType, timestamp)
-
-    case SqlTimeTypeInfo.DATE =>
-      val date = codeGenerator.addReusableDate()
-      codeGenerator.generateNonNullLiteral(targetType, date)
-
-    case SqlTimeTypeInfo.TIME =>
-      val time = codeGenerator.addReusableTime()
-      codeGenerator.generateNonNullLiteral(targetType, time)
-
-    case SqlTimeTypeInfo.TIMESTAMP =>
-      val timestamp = codeGenerator.addReusableTimestamp()
-      codeGenerator.generateNonNullLiteral(targetType, timestamp)
-  }
-
-}