You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2017/07/17 10:12:28 UTC

[2/2] flink git commit: [FLINK-6887] [table] Split up CodeGenerator into several specific CodeGenerator

[FLINK-6887] [table] Split up CodeGenerator into several specific CodeGenerator

This closes #4171.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/527e7499
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/527e7499
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/527e7499

Branch: refs/heads/master
Commit: 527e7499c5807be138d1ba0a278917190a7d4cf1
Parents: ef0653a
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Fri Jun 23 16:29:20 2017 +0800
Committer: Jark Wu <ja...@apache.org>
Committed: Mon Jul 17 16:34:31 2017 +0800

----------------------------------------------------------------------
 .../flink/table/api/TableEnvironment.scala      |   4 +-
 .../codegen/AggregationCodeGenerator.scala      | 436 +++++++++++++
 .../flink/table/codegen/CodeGenerator.scala     | 641 +------------------
 .../table/codegen/CollectorCodeGenerator.scala  | 100 +++
 .../flink/table/codegen/ExpressionReducer.scala |   4 +-
 .../table/codegen/FunctionCodeGenerator.scala   | 177 +++++
 .../codegen/InputFormatCodeGenerator.scala      |  92 +++
 .../flink/table/plan/nodes/CommonCalc.scala     |   4 +-
 .../table/plan/nodes/CommonCorrelate.scala      |  16 +-
 .../flink/table/plan/nodes/CommonScan.scala     |   4 +-
 .../plan/nodes/dataset/DataSetAggregate.scala   |   4 +-
 .../table/plan/nodes/dataset/DataSetCalc.scala  |   4 +-
 .../table/plan/nodes/dataset/DataSetJoin.scala  |   4 +-
 .../nodes/dataset/DataSetSingleRowJoin.scala    |   4 +-
 .../plan/nodes/dataset/DataSetValues.scala      |   4 +-
 .../nodes/dataset/DataSetWindowAggregate.scala  |  10 +-
 .../plan/nodes/datastream/DataStreamCalc.scala  |   4 +-
 .../datastream/DataStreamGroupAggregate.scala   |   5 +-
 .../DataStreamGroupWindowAggregate.scala        |   6 +-
 .../datastream/DataStreamOverAggregate.scala    |   8 +-
 .../nodes/datastream/DataStreamValues.scala     |   4 +-
 .../table/runtime/aggregate/AggregateUtil.scala |  22 +-
 .../expressions/utils/ExpressionTestBase.scala  |   4 +-
 23 files changed, 872 insertions(+), 689 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 48e33cc..252b0eb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -48,7 +48,7 @@ import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableE
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
-import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer, GeneratedFunction}
+import org.apache.flink.table.codegen.{FunctionCodeGenerator, ExpressionReducer, GeneratedFunction}
 import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.functions.AggregateFunction
@@ -790,7 +790,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     }
 
     // code generate MapFunction
-    val generator = new CodeGenerator(
+    val generator = new FunctionCodeGenerator(
       config,
       false,
       inputTypeInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
new file mode 100644
index 0000000..680eb44
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen
+
+import java.lang.reflect.ParameterizedType
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.codegen.CodeGenUtils.newName
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getUserDefinedMethod, signatureToString}
+import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, SingleElementIterable}
+
+/**
+  * A code generator for generating [[GeneratedAggregations]].
+  *
+  * @param config configuration that determines runtime behavior
+  * @param nullableInput input(s) can be null.
+  * @param input type information about the input of the Function
+  */
+class AggregationCodeGenerator(
+    config: TableConfig,
+    nullableInput: Boolean,
+    input: TypeInformation[_ <: Any])
+  extends CodeGenerator(config, nullableInput, input) {
+
+  /**
+    * Generates a [[org.apache.flink.table.runtime.aggregate.GeneratedAggregations]] that can be
+    * passed to a Java compiler.
+    *
+    * @param name        Class name of the function.
+    *                    Does not need to be unique but has to be a valid Java class identifier.
+    * @param generator   The code generator instance
+    * @param physicalInputTypes Physical input row types
+    * @param aggregates  All aggregate functions
+    * @param aggFields   Indexes of the input fields for all aggregate functions
+    * @param aggMapping  The mapping of aggregates to output fields
+    * @param partialResults A flag defining whether final or partial results (accumulators) are set
+    *                       to the output row.
+    * @param fwdMapping  The mapping of input fields to output fields
+    * @param mergeMapping An optional mapping to specify the accumulators to merge. If not set, we
+    *                     assume that both rows have the accumulators at the same position.
+    * @param constantFlags An optional parameter to define where to set constant boolean flags in
+    *                      the output row.
+    * @param outputArity The number of fields in the output row.
+    * @param needRetract a flag to indicate if the aggregate needs the retract method
+    * @param needMerge a flag to indicate if the aggregate needs the merge method
+    * @param needReset a flag to indicate if the aggregate needs the resetAccumulator method
+    *
+    * @return A GeneratedAggregationsFunction
+    */
+  def generateAggregations(
+    name: String,
+    generator: CodeGenerator,
+    physicalInputTypes: Seq[TypeInformation[_]],
+    aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]],
+    aggFields: Array[Array[Int]],
+    aggMapping: Array[Int],
+    partialResults: Boolean,
+    fwdMapping: Array[Int],
+    mergeMapping: Option[Array[Int]],
+    constantFlags: Option[Array[(Int, Boolean)]],
+    outputArity: Int,
+    needRetract: Boolean,
+    needMerge: Boolean,
+    needReset: Boolean)
+  : GeneratedAggregationsFunction = {
+
+    // get unique function name
+    val funcName = newName(name)
+    // register UDAGGs
+    val aggs = aggregates.map(a => generator.addReusableFunction(a))
+    // get java types of accumulators
+    val accTypeClasses = aggregates.map { a =>
+      a.getClass.getMethod("createAccumulator").getReturnType
+    }
+    val accTypes = accTypeClasses.map(_.getCanonicalName)
+
+    // get java classes of input fields
+    val javaClasses = physicalInputTypes.map(t => t.getTypeClass)
+    // get parameter lists for aggregation functions
+    val parameters = aggFields.map { inFields =>
+      val fields = for (f <- inFields) yield
+        s"(${javaClasses(f).getCanonicalName}) input.getField($f)"
+      fields.mkString(", ")
+    }
+    val methodSignaturesList = aggFields.map {
+      inFields => for (f <- inFields) yield javaClasses(f)
+    }
+
+    // check and validate the needed methods
+    aggregates.zipWithIndex.map {
+      case (a, i) => {
+        getUserDefinedMethod(a, "accumulate", Array(accTypeClasses(i)) ++ methodSignaturesList(i))
+        .getOrElse(
+          throw new CodeGenException(
+            s"No matching accumulate method found for AggregateFunction " +
+              s"'${a.getClass.getCanonicalName}'" +
+              s"with parameters '${signatureToString(methodSignaturesList(i))}'.")
+        )
+
+        if (needRetract) {
+          getUserDefinedMethod(a, "retract", Array(accTypeClasses(i)) ++ methodSignaturesList(i))
+          .getOrElse(
+            throw new CodeGenException(
+              s"No matching retract method found for AggregateFunction " +
+                s"'${a.getClass.getCanonicalName}'" +
+                s"with parameters '${signatureToString(methodSignaturesList(i))}'.")
+          )
+        }
+
+        if (needMerge) {
+          val methods =
+            getUserDefinedMethod(a, "merge", Array(accTypeClasses(i), classOf[JIterable[Any]]))
+            .getOrElse(
+              throw new CodeGenException(
+                s"No matching merge method found for AggregateFunction " +
+                  s"${a.getClass.getCanonicalName}'.")
+            )
+
+          var iterableTypeClass = methods.getGenericParameterTypes.apply(1)
+                                  .asInstanceOf[ParameterizedType].getActualTypeArguments.apply(0)
+          // further extract iterableTypeClass if the accumulator has generic type
+          iterableTypeClass match {
+            case impl: ParameterizedType => iterableTypeClass = impl.getRawType
+            case _ =>
+          }
+
+          if (iterableTypeClass != accTypeClasses(i)) {
+            throw new CodeGenException(
+              s"merge method in AggregateFunction ${a.getClass.getCanonicalName} does not have " +
+                s"the correct Iterable type. Actually: ${iterableTypeClass.toString}. " +
+                s"Expected: ${accTypeClasses(i).toString}")
+          }
+        }
+
+        if (needReset) {
+          getUserDefinedMethod(a, "resetAccumulator", Array(accTypeClasses(i)))
+          .getOrElse(
+            throw new CodeGenException(
+              s"No matching resetAccumulator method found for " +
+                s"aggregate ${a.getClass.getCanonicalName}'.")
+          )
+        }
+      }
+    }
+
+    def genSetAggregationResults: String = {
+
+      val sig: String =
+        j"""
+           |  public final void setAggregationResults(
+           |    org.apache.flink.types.Row accs,
+           |    org.apache.flink.types.Row output)""".stripMargin
+
+      val setAggs: String = {
+        for (i <- aggs.indices) yield
+
+          if (partialResults) {
+            j"""
+               |    output.setField(
+               |      ${aggMapping(i)},
+               |      (${accTypes(i)}) accs.getField($i));""".stripMargin
+          } else {
+            j"""
+               |    org.apache.flink.table.functions.AggregateFunction baseClass$i =
+               |      (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)};
+               |
+               |    output.setField(
+               |      ${aggMapping(i)},
+               |      baseClass$i.getValue((${accTypes(i)}) accs.getField($i)));""".stripMargin
+          }
+      }.mkString("\n")
+
+      j"""
+         |$sig {
+         |$setAggs
+         |  }""".stripMargin
+    }
+
+    def genAccumulate: String = {
+
+      val sig: String =
+        j"""
+           |  public final void accumulate(
+           |    org.apache.flink.types.Row accs,
+           |    org.apache.flink.types.Row input)""".stripMargin
+
+      val accumulate: String = {
+        for (i <- aggs.indices) yield
+          j"""
+             |    ${aggs(i)}.accumulate(
+             |      ((${accTypes(i)}) accs.getField($i)),
+             |      ${parameters(i)});""".stripMargin
+      }.mkString("\n")
+
+      j"""$sig {
+         |$accumulate
+         |  }""".stripMargin
+    }
+
+    def genRetract: String = {
+
+      val sig: String =
+        j"""
+           |  public final void retract(
+           |    org.apache.flink.types.Row accs,
+           |    org.apache.flink.types.Row input)""".stripMargin
+
+      val retract: String = {
+        for (i <- aggs.indices) yield
+          j"""
+             |    ${aggs(i)}.retract(
+             |      ((${accTypes(i)}) accs.getField($i)),
+             |      ${parameters(i)});""".stripMargin
+      }.mkString("\n")
+
+      if (needRetract) {
+        j"""
+           |$sig {
+           |$retract
+           |  }""".stripMargin
+      } else {
+        j"""
+           |$sig {
+           |  }""".stripMargin
+      }
+    }
+
+    def genCreateAccumulators: String = {
+
+      val sig: String =
+        j"""
+           |  public final org.apache.flink.types.Row createAccumulators()
+           |    """.stripMargin
+      val init: String =
+        j"""
+           |      org.apache.flink.types.Row accs =
+           |          new org.apache.flink.types.Row(${aggs.length});"""
+        .stripMargin
+      val create: String = {
+        for (i <- aggs.indices) yield
+          j"""
+             |    accs.setField(
+             |      $i,
+             |      ${aggs(i)}.createAccumulator());"""
+          .stripMargin
+      }.mkString("\n")
+      val ret: String =
+        j"""
+           |      return accs;"""
+        .stripMargin
+
+      j"""$sig {
+         |$init
+         |$create
+         |$ret
+         |  }""".stripMargin
+    }
+
+    def genSetForwardedFields: String = {
+
+      val sig: String =
+        j"""
+           |  public final void setForwardedFields(
+           |    org.apache.flink.types.Row input,
+           |    org.apache.flink.types.Row output)
+           |    """.stripMargin
+
+      val forward: String = {
+        for (i <- fwdMapping.indices if fwdMapping(i) >= 0) yield
+          {
+            j"""
+               |    output.setField(
+               |      $i,
+               |      input.getField(${fwdMapping(i)}));"""
+            .stripMargin
+          }
+      }.mkString("\n")
+
+      j"""$sig {
+         |$forward
+         |  }""".stripMargin
+    }
+
+    def genSetConstantFlags: String = {
+
+      val sig: String =
+        j"""
+           |  public final void setConstantFlags(org.apache.flink.types.Row output)
+           |    """.stripMargin
+
+      val setFlags: String = if (constantFlags.isDefined) {
+        {
+          for (cf <- constantFlags.get) yield {
+            j"""
+               |    output.setField(${cf._1}, ${if (cf._2) "true" else "false"});"""
+            .stripMargin
+          }
+        }.mkString("\n")
+      } else {
+        ""
+      }
+
+      j"""$sig {
+         |$setFlags
+         |  }""".stripMargin
+    }
+
+    def genCreateOutputRow: String = {
+      j"""
+         |  public final org.apache.flink.types.Row createOutputRow() {
+         |    return new org.apache.flink.types.Row($outputArity);
+         |  }""".stripMargin
+    }
+
+    def genMergeAccumulatorsPair: String = {
+
+      val mapping = mergeMapping.getOrElse(aggs.indices.toArray)
+
+      val sig: String =
+        j"""
+           |  public final org.apache.flink.types.Row mergeAccumulatorsPair(
+           |    org.apache.flink.types.Row a,
+           |    org.apache.flink.types.Row b)
+           """.stripMargin
+      val merge: String = {
+        for (i <- aggs.indices) yield
+          j"""
+             |    ${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i);
+             |    ${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField(${mapping(i)});
+             |    accIt$i.setElement(bAcc$i);
+             |    ${aggs(i)}.merge(aAcc$i, accIt$i);
+             |    a.setField($i, aAcc$i);
+          """.stripMargin
+      }.mkString("\n")
+      val ret: String =
+        j"""
+           |      return a;
+           """.stripMargin
+
+      if (needMerge) {
+        j"""
+           |$sig {
+           |$merge
+           |$ret
+           |  }""".stripMargin
+      } else {
+        j"""
+           |$sig {
+           |$ret
+           |  }""".stripMargin
+      }
+    }
+
+    def genMergeList: String = {
+      {
+        val singleIterableClass = classOf[SingleElementIterable[_]].getCanonicalName
+        for (i <- accTypes.indices) yield
+          j"""
+             |    private final $singleIterableClass<${accTypes(i)}> accIt$i =
+             |      new $singleIterableClass<${accTypes(i)}>();
+             """.stripMargin
+      }.mkString("\n")
+    }
+
+    def genResetAccumulator: String = {
+
+      val sig: String =
+        j"""
+           |  public final void resetAccumulator(
+           |    org.apache.flink.types.Row accs)""".stripMargin
+
+      val reset: String = {
+        for (i <- aggs.indices) yield
+          j"""
+             |    ${aggs(i)}.resetAccumulator(
+             |      ((${accTypes(i)}) accs.getField($i)));""".stripMargin
+      }.mkString("\n")
+
+      if (needReset) {
+        j"""$sig {
+           |$reset
+           |  }""".stripMargin
+      } else {
+        j"""$sig {
+           |  }""".stripMargin
+      }
+    }
+
+    val generatedAggregationsClass = classOf[GeneratedAggregations].getCanonicalName
+    var funcCode =
+      j"""
+         |public final class $funcName extends $generatedAggregationsClass {
+         |
+         |  ${reuseMemberCode()}
+         |  $genMergeList
+         |  public $funcName() throws Exception {
+         |    ${reuseInitCode()}
+         |  }
+         |  ${reuseConstructorCode(funcName)}
+         |
+         """.stripMargin
+
+    funcCode += genSetAggregationResults + "\n"
+    funcCode += genAccumulate + "\n"
+    funcCode += genRetract + "\n"
+    funcCode += genCreateAccumulators + "\n"
+    funcCode += genSetForwardedFields + "\n"
+    funcCode += genSetConstantFlags + "\n"
+    funcCode += genCreateOutputRow + "\n"
+    funcCode += genMergeAccumulatorsPair + "\n"
+    funcCode += genResetAccumulator + "\n"
+    funcCode += "}"
+
+    GeneratedAggregationsFunction(funcName, funcCode)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index e8bcdcf..af16b51 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.codegen
 
-import java.lang.reflect.ParameterizedType
-import java.lang.{Iterable => JIterable}
 import java.math.{BigDecimal => JBigDecimal}
 
 import org.apache.calcite.avatica.util.DateTimeUtils
@@ -28,25 +26,21 @@ import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
 import org.apache.flink.api.common.functions._
-import org.apache.flink.api.common.io.GenericInputFormat
 import org.apache.flink.api.common.typeinfo._
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils._
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
-import org.apache.flink.table.codegen.Indenter.toISC
 import org.apache.flink.table.codegen.calls.{BuiltInMethods, FunctionGenerator}
 import org.apache.flink.table.codegen.calls.ScalarOperators._
 import org.apache.flink.table.functions.sql.ScalarSqlFunctions
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getUserDefinedMethod, signatureToString}
-import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, TimeMaterializationSqlFunction, UserDefinedFunction}
-import org.apache.flink.table.runtime.TableFunctionCollector
+import org.apache.flink.table.functions.{FunctionContext, TimeMaterializationSqlFunction, UserDefinedFunction}
 import org.apache.flink.table.typeutils.TypeCheckUtils._
 import org.apache.flink.types.Row
 
@@ -54,7 +48,9 @@ import scala.collection.JavaConversions._
 import scala.collection.mutable
 
 /**
-  * A code generator for generating Flink [[org.apache.flink.api.common.functions.Function]]s.
+  * [[CodeGenerator]] is the base code generator for generating Flink
+  * [[org.apache.flink.api.common.functions.Function]]s.
+  * It is responsible for expression generation and tracks the context (member variables etc).
   *
   * @param config configuration that determines runtime behavior
   * @param nullableInput input(s) can be null.
@@ -65,7 +61,7 @@ import scala.collection.mutable
   * @param input2FieldMapping additional mapping information for input2
   *   (e.g. POJO types have no deterministic field order and some input fields might not be read)
   */
-class CodeGenerator(
+abstract class CodeGenerator(
     config: TableConfig,
     nullableInput: Boolean,
     input1: TypeInformation[_ <: Any],
@@ -95,12 +91,12 @@ class CodeGenerator(
     case _ => // ok
   }
 
-  private val input1Mapping = input1FieldMapping match {
+  protected val input1Mapping: Array[Int] = input1FieldMapping match {
     case Some(mapping) => mapping
     case _ => (0 until input1.getArity).toArray
   }
 
-  private val input2Mapping = input2FieldMapping match {
+  protected val input2Mapping: Array[Int] = input2FieldMapping match {
     case Some(mapping) => mapping
     case _ => input2 match {
       case Some(input) => (0 until input.getArity).toArray
@@ -108,31 +104,6 @@ class CodeGenerator(
     }
   }
 
-  /**
-    * A code generator for generating unary Flink
-    * [[org.apache.flink.api.common.functions.Function]]s with one input.
-    *
-    * @param config configuration that determines runtime behavior
-    * @param nullableInput input(s) can be null.
-    * @param input type information about the input of the Function
-    * @param inputFieldMapping additional mapping information necessary for input
-    *   (e.g. POJO types have no deterministic field order and some input fields might not be read)
-    */
-  def this(
-      config: TableConfig,
-      nullableInput: Boolean,
-      input: TypeInformation[Any],
-      inputFieldMapping: Array[Int]) =
-    this(config, nullableInput, input, None, Some(inputFieldMapping))
-
-  /**
-    * A code generator for generating Flink input formats.
-    *
-    * @param config configuration that determines runtime behavior
-    */
-  def this(config: TableConfig) =
-    this(config, false, new RowTypeInfo(), None, None)
-
   // set of member statements that will be added only once
   // we use a LinkedHashSet to keep the insertion order
   private val reusableMemberStatements = mutable.LinkedHashSet[String]()
@@ -261,604 +232,6 @@ class CodeGenerator(
   }
 
   /**
-    * Generates a [[org.apache.flink.table.runtime.aggregate.GeneratedAggregations]] that can be
-    * passed to a Java compiler.
-    *
-    * @param name        Class name of the function.
-    *                    Does not need to be unique but has to be a valid Java class identifier.
-    * @param generator   The code generator instance
-    * @param physicalInputTypes Physical input row types
-    * @param aggregates  All aggregate functions
-    * @param aggFields   Indexes of the input fields for all aggregate functions
-    * @param aggMapping  The mapping of aggregates to output fields
-    * @param partialResults A flag defining whether final or partial results (accumulators) are set
-    *                       to the output row.
-    * @param fwdMapping  The mapping of input fields to output fields
-    * @param mergeMapping An optional mapping to specify the accumulators to merge. If not set, we
-    *                     assume that both rows have the accumulators at the same position.
-    * @param constantFlags An optional parameter to define where to set constant boolean flags in
-    *                      the output row.
-    * @param outputArity The number of fields in the output row.
-    * @param needRetract a flag to indicate if the aggregate needs the retract method
-    * @param needMerge a flag to indicate if the aggregate needs the merge method
-    * @param needReset a flag to indicate if the aggregate needs the resetAccumulator method
-    *
-    * @return A GeneratedAggregationsFunction
-    */
-  def generateAggregations(
-      name: String,
-      generator: CodeGenerator,
-      physicalInputTypes: Seq[TypeInformation[_]],
-      aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]],
-      aggFields: Array[Array[Int]],
-      aggMapping: Array[Int],
-      partialResults: Boolean,
-      fwdMapping: Array[Int],
-      mergeMapping: Option[Array[Int]],
-      constantFlags: Option[Array[(Int, Boolean)]],
-      outputArity: Int,
-      needRetract: Boolean,
-      needMerge: Boolean,
-      needReset: Boolean)
-  : GeneratedAggregationsFunction = {
-
-    // get unique function name
-    val funcName = newName(name)
-    // register UDAGGs
-    val aggs = aggregates.map(a => generator.addReusableFunction(a))
-    // get java types of accumulators
-    val accTypeClasses = aggregates.map { a =>
-      a.getClass.getMethod("createAccumulator").getReturnType
-    }
-    val accTypes = accTypeClasses.map(_.getCanonicalName)
-
-    // get java classes of input fields
-    val javaClasses = physicalInputTypes.map(t => t.getTypeClass)
-    // get parameter lists for aggregation functions
-    val parameters = aggFields.map { inFields =>
-      val fields = for (f <- inFields) yield
-        s"(${javaClasses(f).getCanonicalName}) input.getField($f)"
-      fields.mkString(", ")
-    }
-    val methodSignaturesList = aggFields.map {
-      inFields => for (f <- inFields) yield javaClasses(f)
-    }
-
-    // check and validate the needed methods
-    aggregates.zipWithIndex.map {
-      case (a, i) => {
-        getUserDefinedMethod(a, "accumulate", Array(accTypeClasses(i)) ++ methodSignaturesList(i))
-          .getOrElse(
-            throw new CodeGenException(
-              s"No matching accumulate method found for AggregateFunction " +
-                s"'${a.getClass.getCanonicalName}'" +
-                s"with parameters '${signatureToString(methodSignaturesList(i))}'.")
-          )
-
-        if (needRetract) {
-          getUserDefinedMethod(a, "retract", Array(accTypeClasses(i)) ++ methodSignaturesList(i))
-            .getOrElse(
-              throw new CodeGenException(
-                s"No matching retract method found for AggregateFunction " +
-                  s"'${a.getClass.getCanonicalName}'" +
-                  s"with parameters '${signatureToString(methodSignaturesList(i))}'.")
-            )
-        }
-
-        if (needMerge) {
-          val methods =
-            getUserDefinedMethod(a, "merge", Array(accTypeClasses(i), classOf[JIterable[Any]]))
-              .getOrElse(
-                throw new CodeGenException(
-                  s"No matching merge method found for AggregateFunction " +
-                    s"${a.getClass.getCanonicalName}'.")
-              )
-
-          var iterableTypeClass = methods.getGenericParameterTypes.apply(1)
-            .asInstanceOf[ParameterizedType].getActualTypeArguments.apply(0)
-          // further extract iterableTypeClass if the accumulator has generic type
-          iterableTypeClass match {
-            case impl: ParameterizedType => iterableTypeClass = impl.getRawType
-            case _ =>
-          }
-
-          if (iterableTypeClass != accTypeClasses(i)) {
-            throw new CodeGenException(
-              s"merge method in AggregateFunction ${a.getClass.getCanonicalName} does not have " +
-                s"the correct Iterable type. Actually: ${iterableTypeClass.toString}. " +
-                s"Expected: ${accTypeClasses(i).toString}")
-          }
-        }
-
-        if (needReset) {
-          getUserDefinedMethod(a, "resetAccumulator", Array(accTypeClasses(i)))
-            .getOrElse(
-              throw new CodeGenException(
-                s"No matching resetAccumulator method found for " +
-                  s"aggregate ${a.getClass.getCanonicalName}'.")
-            )
-        }
-      }
-    }
-
-    def genSetAggregationResults: String = {
-
-      val sig: String =
-        j"""
-           |  public final void setAggregationResults(
-           |    org.apache.flink.types.Row accs,
-           |    org.apache.flink.types.Row output)""".stripMargin
-
-      val setAggs: String = {
-        for (i <- aggs.indices) yield
-
-          if (partialResults) {
-            j"""
-               |    output.setField(
-               |      ${aggMapping(i)},
-               |      (${accTypes(i)}) accs.getField($i));""".stripMargin
-          } else {
-            j"""
-               |    org.apache.flink.table.functions.AggregateFunction baseClass$i =
-               |      (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)};
-               |
-               |    output.setField(
-               |      ${aggMapping(i)},
-               |      baseClass$i.getValue((${accTypes(i)}) accs.getField($i)));""".stripMargin
-          }
-      }.mkString("\n")
-
-      j"""
-         |$sig {
-         |$setAggs
-         |  }""".stripMargin
-    }
-
-    def genAccumulate: String = {
-
-      val sig: String =
-        j"""
-            |  public final void accumulate(
-            |    org.apache.flink.types.Row accs,
-            |    org.apache.flink.types.Row input)""".stripMargin
-
-      val accumulate: String = {
-        for (i <- aggs.indices) yield
-          j"""
-             |    ${aggs(i)}.accumulate(
-             |      ((${accTypes(i)}) accs.getField($i)),
-             |      ${parameters(i)});""".stripMargin
-      }.mkString("\n")
-
-      j"""$sig {
-         |$accumulate
-         |  }""".stripMargin
-    }
-
-    def genRetract: String = {
-
-      val sig: String =
-        j"""
-            |  public final void retract(
-            |    org.apache.flink.types.Row accs,
-            |    org.apache.flink.types.Row input)""".stripMargin
-
-      val retract: String = {
-        for (i <- aggs.indices) yield
-          j"""
-             |    ${aggs(i)}.retract(
-             |      ((${accTypes(i)}) accs.getField($i)),
-             |      ${parameters(i)});""".stripMargin
-      }.mkString("\n")
-
-      if (needRetract) {
-        j"""
-           |$sig {
-           |$retract
-           |  }""".stripMargin
-      } else {
-        j"""
-           |$sig {
-           |  }""".stripMargin
-      }
-    }
-
-    def genCreateAccumulators: String = {
-
-      val sig: String =
-        j"""
-           |  public final org.apache.flink.types.Row createAccumulators()
-           |    """.stripMargin
-      val init: String =
-        j"""
-           |      org.apache.flink.types.Row accs =
-           |          new org.apache.flink.types.Row(${aggs.length});"""
-          .stripMargin
-      val create: String = {
-        for (i <- aggs.indices) yield
-          j"""
-             |    accs.setField(
-             |      $i,
-             |      ${aggs(i)}.createAccumulator());"""
-            .stripMargin
-      }.mkString("\n")
-      val ret: String =
-        j"""
-           |      return accs;"""
-          .stripMargin
-
-      j"""$sig {
-         |$init
-         |$create
-         |$ret
-         |  }""".stripMargin
-    }
-
-    def genSetForwardedFields: String = {
-
-      val sig: String =
-        j"""
-           |  public final void setForwardedFields(
-           |    org.apache.flink.types.Row input,
-           |    org.apache.flink.types.Row output)
-           |    """.stripMargin
-
-      val forward: String = {
-        for (i <- fwdMapping.indices if fwdMapping(i) >= 0) yield
-          {
-            j"""
-               |    output.setField(
-               |      $i,
-               |      input.getField(${fwdMapping(i)}));"""
-              .stripMargin
-          }
-      }.mkString("\n")
-
-      j"""$sig {
-         |$forward
-         |  }""".stripMargin
-    }
-
-    def genSetConstantFlags: String = {
-
-      val sig: String =
-        j"""
-           |  public final void setConstantFlags(org.apache.flink.types.Row output)
-           |    """.stripMargin
-
-      val setFlags: String = if (constantFlags.isDefined) {
-        {
-          for (cf <- constantFlags.get) yield {
-            j"""
-               |    output.setField(${cf._1}, ${if (cf._2) "true" else "false"});"""
-              .stripMargin
-          }
-        }.mkString("\n")
-      } else {
-        ""
-      }
-
-      j"""$sig {
-         |$setFlags
-         |  }""".stripMargin
-    }
-
-    def genCreateOutputRow: String = {
-      j"""
-         |  public final org.apache.flink.types.Row createOutputRow() {
-         |    return new org.apache.flink.types.Row($outputArity);
-         |  }""".stripMargin
-    }
-
-    def genMergeAccumulatorsPair: String = {
-
-      val mapping = mergeMapping.getOrElse(aggs.indices.toArray)
-
-      val sig: String =
-        j"""
-           |  public final org.apache.flink.types.Row mergeAccumulatorsPair(
-           |    org.apache.flink.types.Row a,
-           |    org.apache.flink.types.Row b)
-           """.stripMargin
-      val merge: String = {
-        for (i <- aggs.indices) yield
-          j"""
-             |    ${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i);
-             |    ${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField(${mapping(i)});
-             |    accIt$i.setElement(bAcc$i);
-             |    ${aggs(i)}.merge(aAcc$i, accIt$i);
-             |    a.setField($i, aAcc$i);
-             """.stripMargin
-      }.mkString("\n")
-      val ret: String =
-        j"""
-           |      return a;
-           """.stripMargin
-
-      if (needMerge) {
-        j"""
-           |$sig {
-           |$merge
-           |$ret
-           |  }""".stripMargin
-      } else {
-        j"""
-           |$sig {
-           |$ret
-           |  }""".stripMargin
-      }
-    }
-
-    def genMergeList: String = {
-      {
-        val singleIterableClass = "org.apache.flink.table.runtime.aggregate.SingleElementIterable"
-        for (i <- accTypes.indices) yield
-          j"""
-             |    private final $singleIterableClass<${accTypes(i)}> accIt$i =
-             |      new $singleIterableClass<${accTypes(i)}>();
-             """.stripMargin
-      }.mkString("\n")
-    }
-
-    def genResetAccumulator: String = {
-
-      val sig: String =
-        j"""
-           |  public final void resetAccumulator(
-           |    org.apache.flink.types.Row accs)""".stripMargin
-
-      val reset: String = {
-        for (i <- aggs.indices) yield
-          j"""
-             |    ${aggs(i)}.resetAccumulator(
-             |      ((${accTypes(i)}) accs.getField($i)));""".stripMargin
-      }.mkString("\n")
-
-      if (needReset) {
-        j"""$sig {
-           |$reset
-           |  }""".stripMargin
-      } else {
-        j"""$sig {
-           |  }""".stripMargin
-      }
-    }
-
-    var funcCode =
-      j"""
-         |public final class $funcName
-         |  extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations {
-         |
-         |  ${reuseMemberCode()}
-         |  $genMergeList
-         |  public $funcName() throws Exception {
-         |    ${reuseInitCode()}
-         |  }
-         |  ${reuseConstructorCode(funcName)}
-         |
-         """.stripMargin
-
-    funcCode += genSetAggregationResults + "\n"
-    funcCode += genAccumulate + "\n"
-    funcCode += genRetract + "\n"
-    funcCode += genCreateAccumulators + "\n"
-    funcCode += genSetForwardedFields + "\n"
-    funcCode += genSetConstantFlags + "\n"
-    funcCode += genCreateOutputRow + "\n"
-    funcCode += genMergeAccumulatorsPair + "\n"
-    funcCode += genResetAccumulator + "\n"
-    funcCode += "}"
-
-    GeneratedAggregationsFunction(funcName, funcCode)
-  }
-
-  /**
-    * Generates a [[org.apache.flink.api.common.functions.Function]] that can be passed to Java
-    * compiler.
-    *
-    * @param name Class name of the Function. Must not be unique but has to be a valid Java class
-    *             identifier.
-    * @param clazz Flink Function to be generated.
-    * @param bodyCode code contents of the SAM (Single Abstract Method). Inputs, collector, or
-    *                 output record can be accessed via the given term methods.
-    * @param returnType expected return type
-    * @tparam F Flink Function to be generated.
-    * @tparam T Return type of the Flink Function.
-    * @return instance of GeneratedFunction
-    */
-  def generateFunction[F <: Function, T <: Any](
-      name: String,
-      clazz: Class[F],
-      bodyCode: String,
-      returnType: TypeInformation[T])
-    : GeneratedFunction[F, T] = {
-    val funcName = newName(name)
-
-    // Janino does not support generics, that's why we need
-    // manual casting here
-    val samHeader =
-      // FlatMapFunction
-      if (clazz == classOf[FlatMapFunction[_, _]]) {
-        val baseClass = classOf[RichFlatMapFunction[_, _]]
-        val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
-        (baseClass,
-          s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)",
-          List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
-      }
-
-      // MapFunction
-      else if (clazz == classOf[MapFunction[_, _]]) {
-        val baseClass = classOf[RichMapFunction[_, _]]
-        val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
-        (baseClass,
-          "Object map(Object _in1)",
-          List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
-      }
-
-      // FlatJoinFunction
-      else if (clazz == classOf[FlatJoinFunction[_, _, _]]) {
-        val baseClass = classOf[RichFlatJoinFunction[_, _, _]]
-        val inputTypeTerm1 = boxedTypeTermForTypeInfo(input1)
-        val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.getOrElse(
-          throw new CodeGenException("Input 2 for FlatJoinFunction should not be null")))
-        (baseClass,
-          s"void join(Object _in1, Object _in2, org.apache.flink.util.Collector $collectorTerm)",
-          List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;",
-               s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;"))
-      }
-
-      // ProcessFunction
-      else if (clazz == classOf[ProcessFunction[_, _]]) {
-        val baseClass = classOf[ProcessFunction[_, _]]
-        val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
-        (baseClass,
-          s"void processElement(Object _in1, " +
-            s"org.apache.flink.streaming.api.functions.ProcessFunction.Context $contextTerm," +
-            s"org.apache.flink.util.Collector $collectorTerm)",
-          List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
-      }
-      else {
-        // TODO more functions
-        throw new CodeGenException("Unsupported Function.")
-      }
-
-    val funcCode = j"""
-      public class $funcName
-          extends ${samHeader._1.getCanonicalName} {
-
-        ${reuseMemberCode()}
-
-        public $funcName() throws Exception {
-          ${reuseInitCode()}
-        }
-
-        ${reuseConstructorCode(funcName)}
-
-        @Override
-        public void open(${classOf[Configuration].getCanonicalName} parameters) throws Exception {
-          ${reuseOpenCode()}
-        }
-
-        @Override
-        public ${samHeader._2} throws Exception {
-          ${samHeader._3.mkString("\n")}
-          ${reusePerRecordCode()}
-          ${reuseInputUnboxingCode()}
-          $bodyCode
-        }
-
-        @Override
-        public void close() throws Exception {
-          ${reuseCloseCode()}
-        }
-      }
-    """.stripMargin
-
-    GeneratedFunction(funcName, returnType, funcCode)
-  }
-
-  /**
-    * Generates a values input format that can be passed to Java compiler.
-    *
-    * @param name Class name of the input format. Must not be unique but has to be a
-    *             valid Java class identifier.
-    * @param records code for creating records
-    * @param returnType expected return type
-    * @tparam T Return type of the Flink Function.
-    * @return instance of GeneratedFunction
-    */
-  def generateValuesInputFormat[T <: Row](
-      name: String,
-      records: Seq[String],
-      returnType: TypeInformation[T])
-    : GeneratedInput[GenericInputFormat[T], T] = {
-    val funcName = newName(name)
-
-    addReusableOutRecord(returnType)
-
-    val funcCode = j"""
-      public class $funcName extends ${classOf[GenericInputFormat[_]].getCanonicalName} {
-
-        private int nextIdx = 0;
-
-        ${reuseMemberCode()}
-
-        public $funcName() throws Exception {
-          ${reuseInitCode()}
-        }
-
-        @Override
-        public boolean reachedEnd() throws java.io.IOException {
-          return nextIdx >= ${records.length};
-        }
-
-        @Override
-        public Object nextRecord(Object reuse) {
-          switch (nextIdx) {
-            ${records.zipWithIndex.map { case (r, i) =>
-              s"""
-                 |case $i:
-                 |  $r
-                 |break;
-               """.stripMargin
-            }.mkString("\n")}
-          }
-          nextIdx++;
-          return $outRecordTerm;
-        }
-      }
-    """.stripMargin
-
-    GeneratedInput(funcName, returnType, funcCode)
-  }
-
-  /**
-    * Generates a [[TableFunctionCollector]] that can be passed to Java compiler.
-    *
-    * @param name Class name of the table function collector. Must not be unique but has to be a
-    *             valid Java class identifier.
-    * @param bodyCode body code for the collector method
-    * @param collectedType The type information of the element collected by the collector
-    * @return instance of GeneratedCollector
-    */
-  def generateTableFunctionCollector(
-      name: String,
-      bodyCode: String,
-      collectedType: TypeInformation[Any])
-    : GeneratedCollector = {
-
-    val className = newName(name)
-    val input1TypeClass = boxedTypeTermForTypeInfo(input1)
-    val input2TypeClass = boxedTypeTermForTypeInfo(collectedType)
-
-    val funcCode = j"""
-      public class $className extends ${classOf[TableFunctionCollector[_]].getCanonicalName} {
-
-        ${reuseMemberCode()}
-
-        public $className() throws Exception {
-          ${reuseInitCode()}
-        }
-
-        @Override
-        public void collect(Object record) throws Exception {
-          super.collect(record);
-          $input1TypeClass $input1Term = ($input1TypeClass) getInput();
-          $input2TypeClass $input2Term = ($input2TypeClass) record;
-          ${reuseInputUnboxingCode()}
-          $bodyCode
-        }
-
-        @Override
-        public void close() {
-        }
-      }
-    """.stripMargin
-
-    GeneratedCollector(className, funcCode)
-  }
-
-  /**
     * Generates an expression that converts the first input (and second input) into the given type.
     * If two inputs are converted, the second input is appended. If objects or variables can
     * be reused, they will be added to reusable code sections internally. The evaluation result

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
new file mode 100644
index 0000000..70f6638
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, newName}
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.runtime.TableFunctionCollector
+
+
+/**
+  * A code generator for generating [[org.apache.flink.util.Collector]]s.
+  *
+  * @param config configuration that determines runtime behavior
+  * @param nullableInput input(s) can be null.
+  * @param input1 type information about the first input of the Function
+  * @param input2 type information about the second input if the Function is binary
+  * @param input1FieldMapping additional mapping information for input1
+  *   (e.g. POJO types have no deterministic field order and some input fields might not be read)
+  * @param input2FieldMapping additional mapping information for input2
+  *   (e.g. POJO types have no deterministic field order and some input fields might not be read)
+  */
+class CollectorCodeGenerator(
+    config: TableConfig,
+    nullableInput: Boolean,
+    input1: TypeInformation[_ <: Any],
+    input2: Option[TypeInformation[_ <: Any]] = None,
+    input1FieldMapping: Option[Array[Int]] = None,
+    input2FieldMapping: Option[Array[Int]] = None)
+  extends CodeGenerator(
+    config,
+    nullableInput,
+    input1,
+    input2,
+    input1FieldMapping,
+    input2FieldMapping) {
+
+  /**
+    * Generates a [[TableFunctionCollector]] that can be passed to Java compiler.
+    *
+    * @param name Class name of the table function collector. Must not be unique but has to be a
+    *             valid Java class identifier.
+    * @param bodyCode body code for the collector method
+    * @param collectedType The type information of the element collected by the collector
+    * @return instance of GeneratedCollector
+    */
+  def generateTableFunctionCollector(
+    name: String,
+    bodyCode: String,
+    collectedType: TypeInformation[Any])
+  : GeneratedCollector = {
+
+    val className = newName(name)
+    val input1TypeClass = boxedTypeTermForTypeInfo(input1)
+    val input2TypeClass = boxedTypeTermForTypeInfo(collectedType)
+
+    val funcCode = j"""
+      public class $className extends ${classOf[TableFunctionCollector[_]].getCanonicalName} {
+
+        ${reuseMemberCode()}
+
+        public $className() throws Exception {
+          ${reuseInitCode()}
+        }
+
+        @Override
+        public void collect(Object record) throws Exception {
+          super.collect(record);
+          $input1TypeClass $input1Term = ($input1TypeClass) getInput();
+          $input2TypeClass $input2Term = ($input2TypeClass) record;
+          ${reuseInputUnboxingCode()}
+          $bodyCode
+        }
+
+        @Override
+        public void close() {
+        }
+      }
+    """.stripMargin
+
+    GeneratedCollector(className, funcCode)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index b7e1335..cf36417 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -33,7 +33,7 @@ import org.apache.flink.types.Row
 import scala.collection.JavaConverters._
 
 /**
-  * Evaluates constant expressions using Flink's [[CodeGenerator]].
+  * Evaluates constant expressions using Flink's [[FunctionCodeGenerator]].
   */
 class ExpressionReducer(config: TableConfig)
   extends RelOptPlanner.Executor with Compiler[MapFunction[Row, Row]] {
@@ -77,7 +77,7 @@ class ExpressionReducer(config: TableConfig)
     val resultType = new RowTypeInfo(literalTypes: _*)
 
     // generate MapFunction
-    val generator = new CodeGenerator(config, false, EMPTY_ROW_INFO)
+    val generator = new FunctionCodeGenerator(config, false, EMPTY_ROW_INFO)
 
     val result = generator.generateResultExpression(
       resultType,

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
new file mode 100644
index 0000000..e86c4ab
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen
+
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, newName}
+import org.apache.flink.table.codegen.Indenter.toISC
+
+/**
+  * A code generator for generating Flink [[org.apache.flink.api.common.functions.Function]]s.
+  * Including [[MapFunction]], [[FlatMapFunction]], [[FlatJoinFunction]], [[ProcessFunction]].
+  *
+  * @param config configuration that determines runtime behavior
+  * @param nullableInput input(s) can be null.
+  * @param input1 type information about the first input of the Function
+  * @param input2 type information about the second input if the Function is binary
+  * @param input1FieldMapping additional mapping information for input1
+  *   (e.g. POJO types have no deterministic field order and some input fields might not be read)
+  * @param input2FieldMapping additional mapping information for input2
+  *   (e.g. POJO types have no deterministic field order and some input fields might not be read)
+  */
+class FunctionCodeGenerator(
+    config: TableConfig,
+    nullableInput: Boolean,
+    input1: TypeInformation[_ <: Any],
+    input2: Option[TypeInformation[_ <: Any]] = None,
+    input1FieldMapping: Option[Array[Int]] = None,
+    input2FieldMapping: Option[Array[Int]] = None)
+  extends CodeGenerator(
+    config,
+    nullableInput,
+    input1,
+    input2,
+    input1FieldMapping,
+    input2FieldMapping) {
+
+  /**
+    * A code generator for generating unary Flink
+    * [[org.apache.flink.api.common.functions.Function]]s with one input.
+    *
+    * @param config configuration that determines runtime behavior
+    * @param nullableInput input(s) can be null.
+    * @param input type information about the input of the Function
+    * @param inputFieldMapping additional mapping information necessary for input
+    *   (e.g. POJO types have no deterministic field order and some input fields might not be read)
+    */
+  def this(
+    config: TableConfig,
+    nullableInput: Boolean,
+    input: TypeInformation[Any],
+    inputFieldMapping: Array[Int]) =
+    this(config, nullableInput, input, None, Some(inputFieldMapping))
+
+  /**
+    * Generates a [[org.apache.flink.api.common.functions.Function]] that can be passed to Java
+    * compiler.
+    *
+    * @param name Class name of the Function. Must not be unique but has to be a valid Java class
+    *             identifier.
+    * @param clazz Flink Function to be generated.
+    * @param bodyCode code contents of the SAM (Single Abstract Method). Inputs, collector, or
+    *                 output record can be accessed via the given term methods.
+    * @param returnType expected return type
+    * @tparam F Flink Function to be generated.
+    * @tparam T Return type of the Flink Function.
+    * @return instance of GeneratedFunction
+    */
+  def generateFunction[F <: Function, T <: Any](
+    name: String,
+    clazz: Class[F],
+    bodyCode: String,
+    returnType: TypeInformation[T])
+  : GeneratedFunction[F, T] = {
+    val funcName = newName(name)
+
+    // Janino does not support generics, that's why we need
+    // manual casting here
+    val samHeader =
+    // FlatMapFunction
+    if (clazz == classOf[FlatMapFunction[_, _]]) {
+      val baseClass = classOf[RichFlatMapFunction[_, _]]
+      val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
+      (baseClass,
+        s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)",
+        List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
+    }
+
+    // MapFunction
+    else if (clazz == classOf[MapFunction[_, _]]) {
+      val baseClass = classOf[RichMapFunction[_, _]]
+      val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
+      (baseClass,
+        "Object map(Object _in1)",
+        List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
+    }
+
+    // FlatJoinFunction
+    else if (clazz == classOf[FlatJoinFunction[_, _, _]]) {
+      val baseClass = classOf[RichFlatJoinFunction[_, _, _]]
+      val inputTypeTerm1 = boxedTypeTermForTypeInfo(input1)
+      val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.getOrElse(
+        throw new CodeGenException("Input 2 for FlatJoinFunction should not be null")))
+      (baseClass,
+        s"void join(Object _in1, Object _in2, org.apache.flink.util.Collector $collectorTerm)",
+        List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;",
+             s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;"))
+    }
+
+    // ProcessFunction
+    else if (clazz == classOf[ProcessFunction[_, _]]) {
+      val baseClass = classOf[ProcessFunction[_, _]]
+      val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
+      (baseClass,
+        s"void processElement(Object _in1, " +
+          s"org.apache.flink.streaming.api.functions.ProcessFunction.Context $contextTerm," +
+          s"org.apache.flink.util.Collector $collectorTerm)",
+        List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
+    }
+    else {
+      // TODO more functions
+      throw new CodeGenException("Unsupported Function.")
+    }
+
+    val funcCode = j"""
+      public class $funcName
+          extends ${samHeader._1.getCanonicalName} {
+
+        ${reuseMemberCode()}
+
+        public $funcName() throws Exception {
+          ${reuseInitCode()}
+        }
+
+        ${reuseConstructorCode(funcName)}
+
+        @Override
+        public void open(${classOf[Configuration].getCanonicalName} parameters) throws Exception {
+          ${reuseOpenCode()}
+        }
+
+        @Override
+        public ${samHeader._2} throws Exception {
+          ${samHeader._3.mkString("\n")}
+          ${reusePerRecordCode()}
+          ${reuseInputUnboxingCode()}
+          $bodyCode
+        }
+
+        @Override
+        public void close() throws Exception {
+          ${reuseCloseCode()}
+        }
+      }
+    """.stripMargin
+
+    GeneratedFunction(funcName, returnType, funcCode)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
new file mode 100644
index 0000000..6d6e1b6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen
+
+import org.apache.flink.api.common.io.GenericInputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.CodeGenUtils.newName
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.types.Row
+
+/**
+  * A code generator for generating Flink [[GenericInputFormat]]s.
+  *
+  * @param config configuration that determines runtime behavior
+  */
+class InputFormatCodeGenerator(
+    config: TableConfig)
+  extends CodeGenerator(config, false, new RowTypeInfo(), None, None) {
+
+
+  /**
+    * Generates a values input format that can be passed to Java compiler.
+    *
+    * @param name Class name of the input format. Must not be unique but has to be a
+    *             valid Java class identifier.
+    * @param records code for creating records
+    * @param returnType expected return type
+    * @tparam T Return type of the Flink Function.
+    * @return instance of GeneratedFunction
+    */
+  def generateValuesInputFormat[T <: Row](
+    name: String,
+    records: Seq[String],
+    returnType: TypeInformation[T])
+  : GeneratedInput[GenericInputFormat[T], T] = {
+    val funcName = newName(name)
+
+    addReusableOutRecord(returnType)
+
+    val funcCode = j"""
+      public class $funcName extends ${classOf[GenericInputFormat[_]].getCanonicalName} {
+
+        private int nextIdx = 0;
+
+        ${reuseMemberCode()}
+
+        public $funcName() throws Exception {
+          ${reuseInitCode()}
+        }
+
+        @Override
+        public boolean reachedEnd() throws java.io.IOException {
+          return nextIdx >= ${records.length};
+        }
+
+        @Override
+        public Object nextRecord(Object reuse) {
+          switch (nextIdx) {
+            ${records.zipWithIndex.map { case (r, i) =>
+              s"""
+                 |case $i:
+                 |  $r
+                 |break;
+                       """.stripMargin
+            }.mkString("\n")}
+          }
+          nextIdx++;
+          return $outRecordTerm;
+        }
+      }
+    """.stripMargin
+
+    GeneratedInput(funcName, returnType, funcCode)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index 9b486e4..693924e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rex._
 import org.apache.flink.api.common.functions.Function
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
+import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
 
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
 trait CommonCalc {
 
   private[flink] def generateFunction[T <: Function](
-      generator: CodeGenerator,
+      generator: FunctionCodeGenerator,
       ruleDescription: String,
       inputSchema: RowSchema,
       returnSchema: RowSchema,

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index 96e1f5e..96aaf3e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -20,12 +20,12 @@ package org.apache.flink.table.plan.nodes
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexShuttle}
 import org.apache.calcite.sql.SemiJoinType
-import org.apache.flink.api.common.functions.{FlatMapFunction, Function}
+import org.apache.flink.api.common.functions.Function
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue
 import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE}
-import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, GeneratedExpression, GeneratedFunction}
+import org.apache.flink.table.codegen._
 import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.TableFunctionCollector
@@ -55,7 +55,7 @@ trait CommonCorrelate {
 
     val physicalRexCall = inputSchema.mapRexNode(rexCall)
 
-    val functionGenerator = new CodeGenerator(
+    val functionGenerator = new FunctionCodeGenerator(
       config,
       false,
       inputSchema.physicalTypeInfo,
@@ -123,7 +123,7 @@ trait CommonCorrelate {
     pojoFieldMapping: Option[Array[Int]])
   : GeneratedCollector = {
 
-    val generator = new CodeGenerator(
+    val generator = new CollectorCodeGenerator(
       config,
       false,
       inputSchema.physicalTypeInfo,
@@ -155,7 +155,13 @@ trait CommonCorrelate {
       //   The generated expression is discarded.
       generator.generateExpression(condition.get.accept(changeInputRefIndexShuttle))
 
-      val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo, None, pojoFieldMapping)
+      val filterGenerator = new FunctionCodeGenerator(
+        config,
+        false,
+        udtfTypeInfo,
+        None,
+        pojoFieldMapping)
+
       filterGenerator.input1Term = filterGenerator.input2Term
       val filterCondition = filterGenerator.generateExpression(condition.get)
       s"""

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
index 7ce73ee..996d62c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.nodes
 import org.apache.flink.api.common.functions.Function
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
+import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
 import org.apache.flink.types.Row
 
 /**
@@ -48,7 +48,7 @@ trait CommonScan[T] {
       inputFieldMapping: Option[Array[Int]] = None)
     : GeneratedFunction[F, Row] = {
 
-    val generator = new CodeGenerator(
+    val generator = new FunctionCodeGenerator(
       config,
       false,
       inputType,

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
index b53081c..37d1a51 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.AggregationCodeGenerator
 import org.apache.flink.table.plan.nodes.CommonAggregate
 import org.apache.flink.table.runtime.aggregate.{AggregateUtil, DataSetPreAggFunction}
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
@@ -95,7 +95,7 @@ class DataSetAggregate(
 
     val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
 
-    val generator = new CodeGenerator(
+    val generator = new AggregationCodeGenerator(
       tableEnv.getConfig,
       false,
       inputDS.getType)

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index 9a9f738..a923acc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.FunctionCodeGenerator
 import org.apache.flink.table.plan.nodes.CommonCalc
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.FlatMapRunner
@@ -86,7 +86,7 @@ class DataSetCalc(
 
     val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
-    val generator = new CodeGenerator(config, false, inputDS.getType)
+    val generator = new FunctionCodeGenerator(config, false, inputDS.getType)
 
     val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
 

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index e6f8ca4..a6c31d3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.FunctionCodeGenerator
 import org.apache.flink.table.runtime.FlatJoinRunner
 import org.apache.flink.types.Row
 
@@ -158,7 +158,7 @@ class DataSetJoin(
       throw TableException("Null check in TableConfig must be enabled for outer joins.")
     }
 
-    val generator = new CodeGenerator(
+    val generator = new FunctionCodeGenerator(
       config,
       nullCheck,
       leftDataSet.getType,

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
index e92dec5..3c1c58b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
@@ -29,7 +29,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig}
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.FunctionCodeGenerator
 import org.apache.flink.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner}
 import org.apache.flink.types.Row
 
@@ -129,7 +129,7 @@ class DataSetSingleRowJoin(
       case _ => false
     }    
     
-    val codeGenerator = new CodeGenerator(
+    val codeGenerator = new FunctionCodeGenerator(
       config,
       isOuterJoin,
       inputType1,

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
index 948dd27..3a4ba47 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
@@ -27,7 +27,7 @@ import org.apache.calcite.rex.RexLiteral
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.InputFormatCodeGenerator
 import org.apache.flink.table.runtime.io.ValuesInputFormat
 import org.apache.flink.types.Row
 
@@ -72,7 +72,7 @@ class DataSetValues(
 
     val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
-    val generator = new CodeGenerator(config)
+    val generator = new InputFormatCodeGenerator(config)
 
     // generate code for every record
     val generatedRecords = getTuples.asScala.map { r =>

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
index 3cb872a..38de368 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
 import org.apache.flink.table.api.BatchTableEnvironment
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.AggregationCodeGenerator
 import org.apache.flink.table.expressions.ExpressionUtils._
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.plan.nodes.CommonAggregate
@@ -109,7 +109,7 @@ class DataSetWindowAggregate(
 
     val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
-    val generator = new CodeGenerator(
+    val generator = new AggregationCodeGenerator(
       tableEnv.getConfig,
       false,
       inputDS.getType)
@@ -147,7 +147,7 @@ class DataSetWindowAggregate(
   }
 
   private def createEventTimeTumblingWindowDataSet(
-      generator: CodeGenerator,
+      generator: AggregationCodeGenerator,
       inputDS: DataSet[Row],
       isTimeWindow: Boolean,
       isParserCaseSensitive: Boolean): DataSet[Row] = {
@@ -210,7 +210,7 @@ class DataSetWindowAggregate(
   }
 
   private[this] def createEventTimeSessionWindowDataSet(
-      generator: CodeGenerator,
+      generator: AggregationCodeGenerator,
       inputDS: DataSet[Row],
       isParserCaseSensitive: Boolean): DataSet[Row] = {
 
@@ -352,7 +352,7 @@ class DataSetWindowAggregate(
   }
 
   private def createEventTimeSlidingWindowDataSet(
-      generator: CodeGenerator,
+      generator: AggregationCodeGenerator,
       inputDS: DataSet[Row],
       isTimeWindow: Boolean,
       size: Long,

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index 67c5782..d626c46 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -27,7 +27,7 @@ import org.apache.calcite.rex.RexProgram
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.FunctionCodeGenerator
 import org.apache.flink.table.plan.nodes.CommonCalc
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.CRowProcessRunner
@@ -93,7 +93,7 @@ class DataStreamCalc(
     val inputDataStream =
       getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
 
-    val generator = new CodeGenerator(config, false, inputSchema.physicalTypeInfo)
+    val generator = new FunctionCodeGenerator(config, false, inputSchema.physicalTypeInfo)
 
     val genFunction = generateFunction(
       generator,

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index 33bb8cc..12694fc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -18,13 +18,12 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.flink.api.java.functions.NullByteKeySelector
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.AggregationCodeGenerator
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.plan.nodes.CommonAggregate
 import org.apache.flink.table.plan.schema.RowSchema
@@ -121,7 +120,7 @@ class DataStreamGroupAggregate(
 
     val outRowType = CRowTypeInfo(schema.physicalTypeInfo)
 
-    val generator = new CodeGenerator(
+    val generator = new AggregationCodeGenerator(
       tableEnv.getConfig,
       false,
       inputSchema.physicalTypeInfo)

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index d860cbe..c4ffdb1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -26,11 +26,11 @@ import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger
-import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window => DataStreamWindow}
+import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.AggregationCodeGenerator
 import org.apache.flink.table.expressions.ExpressionUtils._
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.plan.nodes.CommonAggregate
@@ -162,7 +162,7 @@ class DataStreamGroupWindowAggregate(
       s"select: ($aggString)"
     val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
 
-    val generator = new CodeGenerator(
+    val generator = new AggregationCodeGenerator(
       tableEnv.getConfig,
       false,
       inputSchema.physicalTypeInfo)

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index c03dac6..34a7fd8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -32,7 +32,7 @@ import org.apache.flink.table.plan.nodes.OverAggregate
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.api.java.functions.NullByteKeySelector
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.AggregationCodeGenerator
 import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -131,7 +131,7 @@ class DataStreamOverAggregate(
         "excessive state size. You may specify a retention time of 0 to not clean up the state.")
     }
 
-    val generator = new CodeGenerator(
+    val generator = new AggregationCodeGenerator(
       tableEnv.getConfig,
       false,
       inputSchema.physicalTypeInfo)
@@ -200,7 +200,7 @@ class DataStreamOverAggregate(
 
   def createUnboundedAndCurrentRowOverWindow(
     queryConfig: StreamQueryConfig,
-    generator: CodeGenerator,
+    generator: AggregationCodeGenerator,
     inputDS: DataStream[CRow],
     isRowTimeType: Boolean,
     isRowsClause: Boolean): DataStream[CRow] = {
@@ -252,7 +252,7 @@ class DataStreamOverAggregate(
 
   def createBoundedAndCurrentRowOverWindow(
     queryConfig: StreamQueryConfig,
-    generator: CodeGenerator,
+    generator: AggregationCodeGenerator,
     inputDS: DataStream[CRow],
     isRowTimeType: Boolean,
     isRowsClause: Boolean): DataStream[CRow] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
index d7c490f..1476681 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.rel.core.Values
 import org.apache.calcite.rex.RexLiteral
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.InputFormatCodeGenerator
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.io.CRowValuesInputFormat
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -63,7 +63,7 @@ class DataStreamValues(
     val config = tableEnv.getConfig
 
     val returnType = CRowTypeInfo(schema.physicalTypeInfo)
-    val generator = new CodeGenerator(config)
+    val generator = new InputFormatCodeGenerator(config)
 
     // generate code for every record
     val generatedRecords = getTuples.asScala.map { r =>