You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2017/07/17 10:12:28 UTC
[2/2] flink git commit: [FLINK-6887] [table] Split up CodeGenerator
into several specific CodeGenerator
[FLINK-6887] [table] Split up CodeGenerator into several specific CodeGenerator
This closes #4171.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/527e7499
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/527e7499
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/527e7499
Branch: refs/heads/master
Commit: 527e7499c5807be138d1ba0a278917190a7d4cf1
Parents: ef0653a
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Fri Jun 23 16:29:20 2017 +0800
Committer: Jark Wu <ja...@apache.org>
Committed: Mon Jul 17 16:34:31 2017 +0800
----------------------------------------------------------------------
.../flink/table/api/TableEnvironment.scala | 4 +-
.../codegen/AggregationCodeGenerator.scala | 436 +++++++++++++
.../flink/table/codegen/CodeGenerator.scala | 641 +------------------
.../table/codegen/CollectorCodeGenerator.scala | 100 +++
.../flink/table/codegen/ExpressionReducer.scala | 4 +-
.../table/codegen/FunctionCodeGenerator.scala | 177 +++++
.../codegen/InputFormatCodeGenerator.scala | 92 +++
.../flink/table/plan/nodes/CommonCalc.scala | 4 +-
.../table/plan/nodes/CommonCorrelate.scala | 16 +-
.../flink/table/plan/nodes/CommonScan.scala | 4 +-
.../plan/nodes/dataset/DataSetAggregate.scala | 4 +-
.../table/plan/nodes/dataset/DataSetCalc.scala | 4 +-
.../table/plan/nodes/dataset/DataSetJoin.scala | 4 +-
.../nodes/dataset/DataSetSingleRowJoin.scala | 4 +-
.../plan/nodes/dataset/DataSetValues.scala | 4 +-
.../nodes/dataset/DataSetWindowAggregate.scala | 10 +-
.../plan/nodes/datastream/DataStreamCalc.scala | 4 +-
.../datastream/DataStreamGroupAggregate.scala | 5 +-
.../DataStreamGroupWindowAggregate.scala | 6 +-
.../datastream/DataStreamOverAggregate.scala | 8 +-
.../nodes/datastream/DataStreamValues.scala | 4 +-
.../table/runtime/aggregate/AggregateUtil.scala | 22 +-
.../expressions/utils/ExpressionTestBase.scala | 4 +-
23 files changed, 872 insertions(+), 689 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 48e33cc..252b0eb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -48,7 +48,7 @@ import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableE
import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
-import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer, GeneratedFunction}
+import org.apache.flink.table.codegen.{FunctionCodeGenerator, ExpressionReducer, GeneratedFunction}
import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.functions.AggregateFunction
@@ -790,7 +790,7 @@ abstract class TableEnvironment(val config: TableConfig) {
}
// code generate MapFunction
- val generator = new CodeGenerator(
+ val generator = new FunctionCodeGenerator(
config,
false,
inputTypeInfo,
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
new file mode 100644
index 0000000..680eb44
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen
+
+import java.lang.reflect.ParameterizedType
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.codegen.CodeGenUtils.newName
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getUserDefinedMethod, signatureToString}
+import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, SingleElementIterable}
+
+/**
+ * A code generator for generating [[GeneratedAggregations]].
+ *
+ * @param config configuration that determines runtime behavior
+ * @param nullableInput input(s) can be null.
+ * @param input type information about the input of the Function
+ */
+class AggregationCodeGenerator(
+ config: TableConfig,
+ nullableInput: Boolean,
+ input: TypeInformation[_ <: Any])
+ extends CodeGenerator(config, nullableInput, input) {
+
+ /**
+ * Generates a [[org.apache.flink.table.runtime.aggregate.GeneratedAggregations]] that can be
+ * passed to a Java compiler.
+ *
+ * @param name Class name of the function.
+ * Does not need to be unique but has to be a valid Java class identifier.
+ * @param generator The code generator instance
+ * @param physicalInputTypes Physical input row types
+ * @param aggregates All aggregate functions
+ * @param aggFields Indexes of the input fields for all aggregate functions
+ * @param aggMapping The mapping of aggregates to output fields
+ * @param partialResults A flag defining whether final or partial results (accumulators) are set
+ * to the output row.
+ * @param fwdMapping The mapping of input fields to output fields
+ * @param mergeMapping An optional mapping to specify the accumulators to merge. If not set, we
+ * assume that both rows have the accumulators at the same position.
+ * @param constantFlags An optional parameter to define where to set constant boolean flags in
+ * the output row.
+ * @param outputArity The number of fields in the output row.
+ * @param needRetract a flag to indicate if the aggregate needs the retract method
+ * @param needMerge a flag to indicate if the aggregate needs the merge method
+ * @param needReset a flag to indicate if the aggregate needs the resetAccumulator method
+ *
+ * @return A GeneratedAggregationsFunction
+ */
+ def generateAggregations(
+ name: String,
+ generator: CodeGenerator,
+ physicalInputTypes: Seq[TypeInformation[_]],
+ aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]],
+ aggFields: Array[Array[Int]],
+ aggMapping: Array[Int],
+ partialResults: Boolean,
+ fwdMapping: Array[Int],
+ mergeMapping: Option[Array[Int]],
+ constantFlags: Option[Array[(Int, Boolean)]],
+ outputArity: Int,
+ needRetract: Boolean,
+ needMerge: Boolean,
+ needReset: Boolean)
+ : GeneratedAggregationsFunction = {
+
+ // get unique function name
+ val funcName = newName(name)
+ // register UDAGGs
+ val aggs = aggregates.map(a => generator.addReusableFunction(a))
+ // get java types of accumulators
+ val accTypeClasses = aggregates.map { a =>
+ a.getClass.getMethod("createAccumulator").getReturnType
+ }
+ val accTypes = accTypeClasses.map(_.getCanonicalName)
+
+ // get java classes of input fields
+ val javaClasses = physicalInputTypes.map(t => t.getTypeClass)
+ // get parameter lists for aggregation functions
+ val parameters = aggFields.map { inFields =>
+ val fields = for (f <- inFields) yield
+ s"(${javaClasses(f).getCanonicalName}) input.getField($f)"
+ fields.mkString(", ")
+ }
+ val methodSignaturesList = aggFields.map {
+ inFields => for (f <- inFields) yield javaClasses(f)
+ }
+
+ // check and validate the needed methods
+ aggregates.zipWithIndex.map {
+ case (a, i) => {
+ getUserDefinedMethod(a, "accumulate", Array(accTypeClasses(i)) ++ methodSignaturesList(i))
+ .getOrElse(
+ throw new CodeGenException(
+ s"No matching accumulate method found for AggregateFunction " +
+ s"'${a.getClass.getCanonicalName}'" +
+ s"with parameters '${signatureToString(methodSignaturesList(i))}'.")
+ )
+
+ if (needRetract) {
+ getUserDefinedMethod(a, "retract", Array(accTypeClasses(i)) ++ methodSignaturesList(i))
+ .getOrElse(
+ throw new CodeGenException(
+ s"No matching retract method found for AggregateFunction " +
+ s"'${a.getClass.getCanonicalName}'" +
+ s"with parameters '${signatureToString(methodSignaturesList(i))}'.")
+ )
+ }
+
+ if (needMerge) {
+ val methods =
+ getUserDefinedMethod(a, "merge", Array(accTypeClasses(i), classOf[JIterable[Any]]))
+ .getOrElse(
+ throw new CodeGenException(
+ s"No matching merge method found for AggregateFunction " +
+ s"${a.getClass.getCanonicalName}'.")
+ )
+
+ var iterableTypeClass = methods.getGenericParameterTypes.apply(1)
+ .asInstanceOf[ParameterizedType].getActualTypeArguments.apply(0)
+ // further extract iterableTypeClass if the accumulator has generic type
+ iterableTypeClass match {
+ case impl: ParameterizedType => iterableTypeClass = impl.getRawType
+ case _ =>
+ }
+
+ if (iterableTypeClass != accTypeClasses(i)) {
+ throw new CodeGenException(
+ s"merge method in AggregateFunction ${a.getClass.getCanonicalName} does not have " +
+ s"the correct Iterable type. Actually: ${iterableTypeClass.toString}. " +
+ s"Expected: ${accTypeClasses(i).toString}")
+ }
+ }
+
+ if (needReset) {
+ getUserDefinedMethod(a, "resetAccumulator", Array(accTypeClasses(i)))
+ .getOrElse(
+ throw new CodeGenException(
+ s"No matching resetAccumulator method found for " +
+ s"aggregate ${a.getClass.getCanonicalName}'.")
+ )
+ }
+ }
+ }
+
+ def genSetAggregationResults: String = {
+
+ val sig: String =
+ j"""
+ | public final void setAggregationResults(
+ | org.apache.flink.types.Row accs,
+ | org.apache.flink.types.Row output)""".stripMargin
+
+ val setAggs: String = {
+ for (i <- aggs.indices) yield
+
+ if (partialResults) {
+ j"""
+ | output.setField(
+ | ${aggMapping(i)},
+ | (${accTypes(i)}) accs.getField($i));""".stripMargin
+ } else {
+ j"""
+ | org.apache.flink.table.functions.AggregateFunction baseClass$i =
+ | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)};
+ |
+ | output.setField(
+ | ${aggMapping(i)},
+ | baseClass$i.getValue((${accTypes(i)}) accs.getField($i)));""".stripMargin
+ }
+ }.mkString("\n")
+
+ j"""
+ |$sig {
+ |$setAggs
+ | }""".stripMargin
+ }
+
+ def genAccumulate: String = {
+
+ val sig: String =
+ j"""
+ | public final void accumulate(
+ | org.apache.flink.types.Row accs,
+ | org.apache.flink.types.Row input)""".stripMargin
+
+ val accumulate: String = {
+ for (i <- aggs.indices) yield
+ j"""
+ | ${aggs(i)}.accumulate(
+ | ((${accTypes(i)}) accs.getField($i)),
+ | ${parameters(i)});""".stripMargin
+ }.mkString("\n")
+
+ j"""$sig {
+ |$accumulate
+ | }""".stripMargin
+ }
+
+ def genRetract: String = {
+
+ val sig: String =
+ j"""
+ | public final void retract(
+ | org.apache.flink.types.Row accs,
+ | org.apache.flink.types.Row input)""".stripMargin
+
+ val retract: String = {
+ for (i <- aggs.indices) yield
+ j"""
+ | ${aggs(i)}.retract(
+ | ((${accTypes(i)}) accs.getField($i)),
+ | ${parameters(i)});""".stripMargin
+ }.mkString("\n")
+
+ if (needRetract) {
+ j"""
+ |$sig {
+ |$retract
+ | }""".stripMargin
+ } else {
+ j"""
+ |$sig {
+ | }""".stripMargin
+ }
+ }
+
+ def genCreateAccumulators: String = {
+
+ val sig: String =
+ j"""
+ | public final org.apache.flink.types.Row createAccumulators()
+ | """.stripMargin
+ val init: String =
+ j"""
+ | org.apache.flink.types.Row accs =
+ | new org.apache.flink.types.Row(${aggs.length});"""
+ .stripMargin
+ val create: String = {
+ for (i <- aggs.indices) yield
+ j"""
+ | accs.setField(
+ | $i,
+ | ${aggs(i)}.createAccumulator());"""
+ .stripMargin
+ }.mkString("\n")
+ val ret: String =
+ j"""
+ | return accs;"""
+ .stripMargin
+
+ j"""$sig {
+ |$init
+ |$create
+ |$ret
+ | }""".stripMargin
+ }
+
+ def genSetForwardedFields: String = {
+
+ val sig: String =
+ j"""
+ | public final void setForwardedFields(
+ | org.apache.flink.types.Row input,
+ | org.apache.flink.types.Row output)
+ | """.stripMargin
+
+ val forward: String = {
+ for (i <- fwdMapping.indices if fwdMapping(i) >= 0) yield
+ {
+ j"""
+ | output.setField(
+ | $i,
+ | input.getField(${fwdMapping(i)}));"""
+ .stripMargin
+ }
+ }.mkString("\n")
+
+ j"""$sig {
+ |$forward
+ | }""".stripMargin
+ }
+
+ def genSetConstantFlags: String = {
+
+ val sig: String =
+ j"""
+ | public final void setConstantFlags(org.apache.flink.types.Row output)
+ | """.stripMargin
+
+ val setFlags: String = if (constantFlags.isDefined) {
+ {
+ for (cf <- constantFlags.get) yield {
+ j"""
+ | output.setField(${cf._1}, ${if (cf._2) "true" else "false"});"""
+ .stripMargin
+ }
+ }.mkString("\n")
+ } else {
+ ""
+ }
+
+ j"""$sig {
+ |$setFlags
+ | }""".stripMargin
+ }
+
+ def genCreateOutputRow: String = {
+ j"""
+ | public final org.apache.flink.types.Row createOutputRow() {
+ | return new org.apache.flink.types.Row($outputArity);
+ | }""".stripMargin
+ }
+
+ def genMergeAccumulatorsPair: String = {
+
+ val mapping = mergeMapping.getOrElse(aggs.indices.toArray)
+
+ val sig: String =
+ j"""
+ | public final org.apache.flink.types.Row mergeAccumulatorsPair(
+ | org.apache.flink.types.Row a,
+ | org.apache.flink.types.Row b)
+ """.stripMargin
+ val merge: String = {
+ for (i <- aggs.indices) yield
+ j"""
+ | ${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i);
+ | ${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField(${mapping(i)});
+ | accIt$i.setElement(bAcc$i);
+ | ${aggs(i)}.merge(aAcc$i, accIt$i);
+ | a.setField($i, aAcc$i);
+ """.stripMargin
+ }.mkString("\n")
+ val ret: String =
+ j"""
+ | return a;
+ """.stripMargin
+
+ if (needMerge) {
+ j"""
+ |$sig {
+ |$merge
+ |$ret
+ | }""".stripMargin
+ } else {
+ j"""
+ |$sig {
+ |$ret
+ | }""".stripMargin
+ }
+ }
+
+ def genMergeList: String = {
+ {
+ val singleIterableClass = classOf[SingleElementIterable[_]].getCanonicalName
+ for (i <- accTypes.indices) yield
+ j"""
+ | private final $singleIterableClass<${accTypes(i)}> accIt$i =
+ | new $singleIterableClass<${accTypes(i)}>();
+ """.stripMargin
+ }.mkString("\n")
+ }
+
+ def genResetAccumulator: String = {
+
+ val sig: String =
+ j"""
+ | public final void resetAccumulator(
+ | org.apache.flink.types.Row accs)""".stripMargin
+
+ val reset: String = {
+ for (i <- aggs.indices) yield
+ j"""
+ | ${aggs(i)}.resetAccumulator(
+ | ((${accTypes(i)}) accs.getField($i)));""".stripMargin
+ }.mkString("\n")
+
+ if (needReset) {
+ j"""$sig {
+ |$reset
+ | }""".stripMargin
+ } else {
+ j"""$sig {
+ | }""".stripMargin
+ }
+ }
+
+ val generatedAggregationsClass = classOf[GeneratedAggregations].getCanonicalName
+ var funcCode =
+ j"""
+ |public final class $funcName extends $generatedAggregationsClass {
+ |
+ | ${reuseMemberCode()}
+ | $genMergeList
+ | public $funcName() throws Exception {
+ | ${reuseInitCode()}
+ | }
+ | ${reuseConstructorCode(funcName)}
+ |
+ """.stripMargin
+
+ funcCode += genSetAggregationResults + "\n"
+ funcCode += genAccumulate + "\n"
+ funcCode += genRetract + "\n"
+ funcCode += genCreateAccumulators + "\n"
+ funcCode += genSetForwardedFields + "\n"
+ funcCode += genSetConstantFlags + "\n"
+ funcCode += genCreateOutputRow + "\n"
+ funcCode += genMergeAccumulatorsPair + "\n"
+ funcCode += genResetAccumulator + "\n"
+ funcCode += "}"
+
+ GeneratedAggregationsFunction(funcName, funcCode)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index e8bcdcf..af16b51 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -18,8 +18,6 @@
package org.apache.flink.table.codegen
-import java.lang.reflect.ParameterizedType
-import java.lang.{Iterable => JIterable}
import java.math.{BigDecimal => JBigDecimal}
import org.apache.calcite.avatica.util.DateTimeUtils
@@ -28,25 +26,21 @@ import org.apache.calcite.sql.SqlOperator
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.fun.SqlStdOperatorTable._
import org.apache.flink.api.common.functions._
-import org.apache.flink.api.common.io.GenericInputFormat
import org.apache.flink.api.common.typeinfo._
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenUtils._
import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
-import org.apache.flink.table.codegen.Indenter.toISC
import org.apache.flink.table.codegen.calls.{BuiltInMethods, FunctionGenerator}
import org.apache.flink.table.codegen.calls.ScalarOperators._
import org.apache.flink.table.functions.sql.ScalarSqlFunctions
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getUserDefinedMethod, signatureToString}
-import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, TimeMaterializationSqlFunction, UserDefinedFunction}
-import org.apache.flink.table.runtime.TableFunctionCollector
+import org.apache.flink.table.functions.{FunctionContext, TimeMaterializationSqlFunction, UserDefinedFunction}
import org.apache.flink.table.typeutils.TypeCheckUtils._
import org.apache.flink.types.Row
@@ -54,7 +48,9 @@ import scala.collection.JavaConversions._
import scala.collection.mutable
/**
- * A code generator for generating Flink [[org.apache.flink.api.common.functions.Function]]s.
+ * [[CodeGenerator]] is the base code generator for generating Flink
+ * [[org.apache.flink.api.common.functions.Function]]s.
+ * It is responsible for expression generation and tracks the context (member variables etc).
*
* @param config configuration that determines runtime behavior
* @param nullableInput input(s) can be null.
@@ -65,7 +61,7 @@ import scala.collection.mutable
* @param input2FieldMapping additional mapping information for input2
* (e.g. POJO types have no deterministic field order and some input fields might not be read)
*/
-class CodeGenerator(
+abstract class CodeGenerator(
config: TableConfig,
nullableInput: Boolean,
input1: TypeInformation[_ <: Any],
@@ -95,12 +91,12 @@ class CodeGenerator(
case _ => // ok
}
- private val input1Mapping = input1FieldMapping match {
+ protected val input1Mapping: Array[Int] = input1FieldMapping match {
case Some(mapping) => mapping
case _ => (0 until input1.getArity).toArray
}
- private val input2Mapping = input2FieldMapping match {
+ protected val input2Mapping: Array[Int] = input2FieldMapping match {
case Some(mapping) => mapping
case _ => input2 match {
case Some(input) => (0 until input.getArity).toArray
@@ -108,31 +104,6 @@ class CodeGenerator(
}
}
- /**
- * A code generator for generating unary Flink
- * [[org.apache.flink.api.common.functions.Function]]s with one input.
- *
- * @param config configuration that determines runtime behavior
- * @param nullableInput input(s) can be null.
- * @param input type information about the input of the Function
- * @param inputFieldMapping additional mapping information necessary for input
- * (e.g. POJO types have no deterministic field order and some input fields might not be read)
- */
- def this(
- config: TableConfig,
- nullableInput: Boolean,
- input: TypeInformation[Any],
- inputFieldMapping: Array[Int]) =
- this(config, nullableInput, input, None, Some(inputFieldMapping))
-
- /**
- * A code generator for generating Flink input formats.
- *
- * @param config configuration that determines runtime behavior
- */
- def this(config: TableConfig) =
- this(config, false, new RowTypeInfo(), None, None)
-
// set of member statements that will be added only once
// we use a LinkedHashSet to keep the insertion order
private val reusableMemberStatements = mutable.LinkedHashSet[String]()
@@ -261,604 +232,6 @@ class CodeGenerator(
}
/**
- * Generates a [[org.apache.flink.table.runtime.aggregate.GeneratedAggregations]] that can be
- * passed to a Java compiler.
- *
- * @param name Class name of the function.
- * Does not need to be unique but has to be a valid Java class identifier.
- * @param generator The code generator instance
- * @param physicalInputTypes Physical input row types
- * @param aggregates All aggregate functions
- * @param aggFields Indexes of the input fields for all aggregate functions
- * @param aggMapping The mapping of aggregates to output fields
- * @param partialResults A flag defining whether final or partial results (accumulators) are set
- * to the output row.
- * @param fwdMapping The mapping of input fields to output fields
- * @param mergeMapping An optional mapping to specify the accumulators to merge. If not set, we
- * assume that both rows have the accumulators at the same position.
- * @param constantFlags An optional parameter to define where to set constant boolean flags in
- * the output row.
- * @param outputArity The number of fields in the output row.
- * @param needRetract a flag to indicate if the aggregate needs the retract method
- * @param needMerge a flag to indicate if the aggregate needs the merge method
- * @param needReset a flag to indicate if the aggregate needs the resetAccumulator method
- *
- * @return A GeneratedAggregationsFunction
- */
- def generateAggregations(
- name: String,
- generator: CodeGenerator,
- physicalInputTypes: Seq[TypeInformation[_]],
- aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]],
- aggFields: Array[Array[Int]],
- aggMapping: Array[Int],
- partialResults: Boolean,
- fwdMapping: Array[Int],
- mergeMapping: Option[Array[Int]],
- constantFlags: Option[Array[(Int, Boolean)]],
- outputArity: Int,
- needRetract: Boolean,
- needMerge: Boolean,
- needReset: Boolean)
- : GeneratedAggregationsFunction = {
-
- // get unique function name
- val funcName = newName(name)
- // register UDAGGs
- val aggs = aggregates.map(a => generator.addReusableFunction(a))
- // get java types of accumulators
- val accTypeClasses = aggregates.map { a =>
- a.getClass.getMethod("createAccumulator").getReturnType
- }
- val accTypes = accTypeClasses.map(_.getCanonicalName)
-
- // get java classes of input fields
- val javaClasses = physicalInputTypes.map(t => t.getTypeClass)
- // get parameter lists for aggregation functions
- val parameters = aggFields.map { inFields =>
- val fields = for (f <- inFields) yield
- s"(${javaClasses(f).getCanonicalName}) input.getField($f)"
- fields.mkString(", ")
- }
- val methodSignaturesList = aggFields.map {
- inFields => for (f <- inFields) yield javaClasses(f)
- }
-
- // check and validate the needed methods
- aggregates.zipWithIndex.map {
- case (a, i) => {
- getUserDefinedMethod(a, "accumulate", Array(accTypeClasses(i)) ++ methodSignaturesList(i))
- .getOrElse(
- throw new CodeGenException(
- s"No matching accumulate method found for AggregateFunction " +
- s"'${a.getClass.getCanonicalName}'" +
- s"with parameters '${signatureToString(methodSignaturesList(i))}'.")
- )
-
- if (needRetract) {
- getUserDefinedMethod(a, "retract", Array(accTypeClasses(i)) ++ methodSignaturesList(i))
- .getOrElse(
- throw new CodeGenException(
- s"No matching retract method found for AggregateFunction " +
- s"'${a.getClass.getCanonicalName}'" +
- s"with parameters '${signatureToString(methodSignaturesList(i))}'.")
- )
- }
-
- if (needMerge) {
- val methods =
- getUserDefinedMethod(a, "merge", Array(accTypeClasses(i), classOf[JIterable[Any]]))
- .getOrElse(
- throw new CodeGenException(
- s"No matching merge method found for AggregateFunction " +
- s"${a.getClass.getCanonicalName}'.")
- )
-
- var iterableTypeClass = methods.getGenericParameterTypes.apply(1)
- .asInstanceOf[ParameterizedType].getActualTypeArguments.apply(0)
- // further extract iterableTypeClass if the accumulator has generic type
- iterableTypeClass match {
- case impl: ParameterizedType => iterableTypeClass = impl.getRawType
- case _ =>
- }
-
- if (iterableTypeClass != accTypeClasses(i)) {
- throw new CodeGenException(
- s"merge method in AggregateFunction ${a.getClass.getCanonicalName} does not have " +
- s"the correct Iterable type. Actually: ${iterableTypeClass.toString}. " +
- s"Expected: ${accTypeClasses(i).toString}")
- }
- }
-
- if (needReset) {
- getUserDefinedMethod(a, "resetAccumulator", Array(accTypeClasses(i)))
- .getOrElse(
- throw new CodeGenException(
- s"No matching resetAccumulator method found for " +
- s"aggregate ${a.getClass.getCanonicalName}'.")
- )
- }
- }
- }
-
- def genSetAggregationResults: String = {
-
- val sig: String =
- j"""
- | public final void setAggregationResults(
- | org.apache.flink.types.Row accs,
- | org.apache.flink.types.Row output)""".stripMargin
-
- val setAggs: String = {
- for (i <- aggs.indices) yield
-
- if (partialResults) {
- j"""
- | output.setField(
- | ${aggMapping(i)},
- | (${accTypes(i)}) accs.getField($i));""".stripMargin
- } else {
- j"""
- | org.apache.flink.table.functions.AggregateFunction baseClass$i =
- | (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)};
- |
- | output.setField(
- | ${aggMapping(i)},
- | baseClass$i.getValue((${accTypes(i)}) accs.getField($i)));""".stripMargin
- }
- }.mkString("\n")
-
- j"""
- |$sig {
- |$setAggs
- | }""".stripMargin
- }
-
- def genAccumulate: String = {
-
- val sig: String =
- j"""
- | public final void accumulate(
- | org.apache.flink.types.Row accs,
- | org.apache.flink.types.Row input)""".stripMargin
-
- val accumulate: String = {
- for (i <- aggs.indices) yield
- j"""
- | ${aggs(i)}.accumulate(
- | ((${accTypes(i)}) accs.getField($i)),
- | ${parameters(i)});""".stripMargin
- }.mkString("\n")
-
- j"""$sig {
- |$accumulate
- | }""".stripMargin
- }
-
- def genRetract: String = {
-
- val sig: String =
- j"""
- | public final void retract(
- | org.apache.flink.types.Row accs,
- | org.apache.flink.types.Row input)""".stripMargin
-
- val retract: String = {
- for (i <- aggs.indices) yield
- j"""
- | ${aggs(i)}.retract(
- | ((${accTypes(i)}) accs.getField($i)),
- | ${parameters(i)});""".stripMargin
- }.mkString("\n")
-
- if (needRetract) {
- j"""
- |$sig {
- |$retract
- | }""".stripMargin
- } else {
- j"""
- |$sig {
- | }""".stripMargin
- }
- }
-
- def genCreateAccumulators: String = {
-
- val sig: String =
- j"""
- | public final org.apache.flink.types.Row createAccumulators()
- | """.stripMargin
- val init: String =
- j"""
- | org.apache.flink.types.Row accs =
- | new org.apache.flink.types.Row(${aggs.length});"""
- .stripMargin
- val create: String = {
- for (i <- aggs.indices) yield
- j"""
- | accs.setField(
- | $i,
- | ${aggs(i)}.createAccumulator());"""
- .stripMargin
- }.mkString("\n")
- val ret: String =
- j"""
- | return accs;"""
- .stripMargin
-
- j"""$sig {
- |$init
- |$create
- |$ret
- | }""".stripMargin
- }
-
- def genSetForwardedFields: String = {
-
- val sig: String =
- j"""
- | public final void setForwardedFields(
- | org.apache.flink.types.Row input,
- | org.apache.flink.types.Row output)
- | """.stripMargin
-
- val forward: String = {
- for (i <- fwdMapping.indices if fwdMapping(i) >= 0) yield
- {
- j"""
- | output.setField(
- | $i,
- | input.getField(${fwdMapping(i)}));"""
- .stripMargin
- }
- }.mkString("\n")
-
- j"""$sig {
- |$forward
- | }""".stripMargin
- }
-
- def genSetConstantFlags: String = {
-
- val sig: String =
- j"""
- | public final void setConstantFlags(org.apache.flink.types.Row output)
- | """.stripMargin
-
- val setFlags: String = if (constantFlags.isDefined) {
- {
- for (cf <- constantFlags.get) yield {
- j"""
- | output.setField(${cf._1}, ${if (cf._2) "true" else "false"});"""
- .stripMargin
- }
- }.mkString("\n")
- } else {
- ""
- }
-
- j"""$sig {
- |$setFlags
- | }""".stripMargin
- }
-
- def genCreateOutputRow: String = {
- j"""
- | public final org.apache.flink.types.Row createOutputRow() {
- | return new org.apache.flink.types.Row($outputArity);
- | }""".stripMargin
- }
-
- def genMergeAccumulatorsPair: String = {
-
- val mapping = mergeMapping.getOrElse(aggs.indices.toArray)
-
- val sig: String =
- j"""
- | public final org.apache.flink.types.Row mergeAccumulatorsPair(
- | org.apache.flink.types.Row a,
- | org.apache.flink.types.Row b)
- """.stripMargin
- val merge: String = {
- for (i <- aggs.indices) yield
- j"""
- | ${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i);
- | ${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField(${mapping(i)});
- | accIt$i.setElement(bAcc$i);
- | ${aggs(i)}.merge(aAcc$i, accIt$i);
- | a.setField($i, aAcc$i);
- """.stripMargin
- }.mkString("\n")
- val ret: String =
- j"""
- | return a;
- """.stripMargin
-
- if (needMerge) {
- j"""
- |$sig {
- |$merge
- |$ret
- | }""".stripMargin
- } else {
- j"""
- |$sig {
- |$ret
- | }""".stripMargin
- }
- }
-
- def genMergeList: String = {
- {
- val singleIterableClass = "org.apache.flink.table.runtime.aggregate.SingleElementIterable"
- for (i <- accTypes.indices) yield
- j"""
- | private final $singleIterableClass<${accTypes(i)}> accIt$i =
- | new $singleIterableClass<${accTypes(i)}>();
- """.stripMargin
- }.mkString("\n")
- }
-
- def genResetAccumulator: String = {
-
- val sig: String =
- j"""
- | public final void resetAccumulator(
- | org.apache.flink.types.Row accs)""".stripMargin
-
- val reset: String = {
- for (i <- aggs.indices) yield
- j"""
- | ${aggs(i)}.resetAccumulator(
- | ((${accTypes(i)}) accs.getField($i)));""".stripMargin
- }.mkString("\n")
-
- if (needReset) {
- j"""$sig {
- |$reset
- | }""".stripMargin
- } else {
- j"""$sig {
- | }""".stripMargin
- }
- }
-
- var funcCode =
- j"""
- |public final class $funcName
- | extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations {
- |
- | ${reuseMemberCode()}
- | $genMergeList
- | public $funcName() throws Exception {
- | ${reuseInitCode()}
- | }
- | ${reuseConstructorCode(funcName)}
- |
- """.stripMargin
-
- funcCode += genSetAggregationResults + "\n"
- funcCode += genAccumulate + "\n"
- funcCode += genRetract + "\n"
- funcCode += genCreateAccumulators + "\n"
- funcCode += genSetForwardedFields + "\n"
- funcCode += genSetConstantFlags + "\n"
- funcCode += genCreateOutputRow + "\n"
- funcCode += genMergeAccumulatorsPair + "\n"
- funcCode += genResetAccumulator + "\n"
- funcCode += "}"
-
- GeneratedAggregationsFunction(funcName, funcCode)
- }
-
- /**
- * Generates a [[org.apache.flink.api.common.functions.Function]] that can be passed to Java
- * compiler.
- *
- * @param name Class name of the Function. Must not be unique but has to be a valid Java class
- * identifier.
- * @param clazz Flink Function to be generated.
- * @param bodyCode code contents of the SAM (Single Abstract Method). Inputs, collector, or
- * output record can be accessed via the given term methods.
- * @param returnType expected return type
- * @tparam F Flink Function to be generated.
- * @tparam T Return type of the Flink Function.
- * @return instance of GeneratedFunction
- */
- def generateFunction[F <: Function, T <: Any](
- name: String,
- clazz: Class[F],
- bodyCode: String,
- returnType: TypeInformation[T])
- : GeneratedFunction[F, T] = {
- val funcName = newName(name)
-
- // Janino does not support generics, that's why we need
- // manual casting here
- val samHeader =
- // FlatMapFunction
- if (clazz == classOf[FlatMapFunction[_, _]]) {
- val baseClass = classOf[RichFlatMapFunction[_, _]]
- val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
- (baseClass,
- s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)",
- List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
- }
-
- // MapFunction
- else if (clazz == classOf[MapFunction[_, _]]) {
- val baseClass = classOf[RichMapFunction[_, _]]
- val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
- (baseClass,
- "Object map(Object _in1)",
- List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
- }
-
- // FlatJoinFunction
- else if (clazz == classOf[FlatJoinFunction[_, _, _]]) {
- val baseClass = classOf[RichFlatJoinFunction[_, _, _]]
- val inputTypeTerm1 = boxedTypeTermForTypeInfo(input1)
- val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.getOrElse(
- throw new CodeGenException("Input 2 for FlatJoinFunction should not be null")))
- (baseClass,
- s"void join(Object _in1, Object _in2, org.apache.flink.util.Collector $collectorTerm)",
- List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;",
- s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;"))
- }
-
- // ProcessFunction
- else if (clazz == classOf[ProcessFunction[_, _]]) {
- val baseClass = classOf[ProcessFunction[_, _]]
- val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
- (baseClass,
- s"void processElement(Object _in1, " +
- s"org.apache.flink.streaming.api.functions.ProcessFunction.Context $contextTerm," +
- s"org.apache.flink.util.Collector $collectorTerm)",
- List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
- }
- else {
- // TODO more functions
- throw new CodeGenException("Unsupported Function.")
- }
-
- val funcCode = j"""
- public class $funcName
- extends ${samHeader._1.getCanonicalName} {
-
- ${reuseMemberCode()}
-
- public $funcName() throws Exception {
- ${reuseInitCode()}
- }
-
- ${reuseConstructorCode(funcName)}
-
- @Override
- public void open(${classOf[Configuration].getCanonicalName} parameters) throws Exception {
- ${reuseOpenCode()}
- }
-
- @Override
- public ${samHeader._2} throws Exception {
- ${samHeader._3.mkString("\n")}
- ${reusePerRecordCode()}
- ${reuseInputUnboxingCode()}
- $bodyCode
- }
-
- @Override
- public void close() throws Exception {
- ${reuseCloseCode()}
- }
- }
- """.stripMargin
-
- GeneratedFunction(funcName, returnType, funcCode)
- }
-
- /**
- * Generates a values input format that can be passed to Java compiler.
- *
- * @param name Class name of the input format. Must not be unique but has to be a
- * valid Java class identifier.
- * @param records code for creating records
- * @param returnType expected return type
- * @tparam T Return type of the Flink Function.
- * @return instance of GeneratedFunction
- */
- def generateValuesInputFormat[T <: Row](
- name: String,
- records: Seq[String],
- returnType: TypeInformation[T])
- : GeneratedInput[GenericInputFormat[T], T] = {
- val funcName = newName(name)
-
- addReusableOutRecord(returnType)
-
- val funcCode = j"""
- public class $funcName extends ${classOf[GenericInputFormat[_]].getCanonicalName} {
-
- private int nextIdx = 0;
-
- ${reuseMemberCode()}
-
- public $funcName() throws Exception {
- ${reuseInitCode()}
- }
-
- @Override
- public boolean reachedEnd() throws java.io.IOException {
- return nextIdx >= ${records.length};
- }
-
- @Override
- public Object nextRecord(Object reuse) {
- switch (nextIdx) {
- ${records.zipWithIndex.map { case (r, i) =>
- s"""
- |case $i:
- | $r
- |break;
- """.stripMargin
- }.mkString("\n")}
- }
- nextIdx++;
- return $outRecordTerm;
- }
- }
- """.stripMargin
-
- GeneratedInput(funcName, returnType, funcCode)
- }
-
- /**
- * Generates a [[TableFunctionCollector]] that can be passed to Java compiler.
- *
- * @param name Class name of the table function collector. Must not be unique but has to be a
- * valid Java class identifier.
- * @param bodyCode body code for the collector method
- * @param collectedType The type information of the element collected by the collector
- * @return instance of GeneratedCollector
- */
- def generateTableFunctionCollector(
- name: String,
- bodyCode: String,
- collectedType: TypeInformation[Any])
- : GeneratedCollector = {
-
- val className = newName(name)
- val input1TypeClass = boxedTypeTermForTypeInfo(input1)
- val input2TypeClass = boxedTypeTermForTypeInfo(collectedType)
-
- val funcCode = j"""
- public class $className extends ${classOf[TableFunctionCollector[_]].getCanonicalName} {
-
- ${reuseMemberCode()}
-
- public $className() throws Exception {
- ${reuseInitCode()}
- }
-
- @Override
- public void collect(Object record) throws Exception {
- super.collect(record);
- $input1TypeClass $input1Term = ($input1TypeClass) getInput();
- $input2TypeClass $input2Term = ($input2TypeClass) record;
- ${reuseInputUnboxingCode()}
- $bodyCode
- }
-
- @Override
- public void close() {
- }
- }
- """.stripMargin
-
- GeneratedCollector(className, funcCode)
- }
-
- /**
* Generates an expression that converts the first input (and second input) into the given type.
* If two inputs are converted, the second input is appended. If objects or variables can
* be reused, they will be added to reusable code sections internally. The evaluation result
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
new file mode 100644
index 0000000..70f6638
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, newName}
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.runtime.TableFunctionCollector
+
+
+/**
+ * A code generator for generating [[org.apache.flink.util.Collector]]s.
+ *
+ * @param config configuration that determines runtime behavior
+ * @param nullableInput input(s) can be null.
+ * @param input1 type information about the first input of the Function
+ * @param input2 type information about the second input if the Function is binary
+ * @param input1FieldMapping additional mapping information for input1
+ * (e.g. POJO types have no deterministic field order and some input fields might not be read)
+ * @param input2FieldMapping additional mapping information for input2
+ * (e.g. POJO types have no deterministic field order and some input fields might not be read)
+ */
+class CollectorCodeGenerator(
+ config: TableConfig,
+ nullableInput: Boolean,
+ input1: TypeInformation[_ <: Any],
+ input2: Option[TypeInformation[_ <: Any]] = None,
+ input1FieldMapping: Option[Array[Int]] = None,
+ input2FieldMapping: Option[Array[Int]] = None)
+ extends CodeGenerator(
+ config,
+ nullableInput,
+ input1,
+ input2,
+ input1FieldMapping,
+ input2FieldMapping) {
+
+ /**
+ * Generates a [[TableFunctionCollector]] that can be passed to Java compiler.
+ *
+ * @param name Class name of the table function collector. Must not be unique but has to be a
+ * valid Java class identifier.
+ * @param bodyCode body code for the collector method
+ * @param collectedType The type information of the element collected by the collector
+ * @return instance of GeneratedCollector
+ */
+ def generateTableFunctionCollector(
+ name: String,
+ bodyCode: String,
+ collectedType: TypeInformation[Any])
+ : GeneratedCollector = {
+
+ val className = newName(name)
+ val input1TypeClass = boxedTypeTermForTypeInfo(input1)
+ val input2TypeClass = boxedTypeTermForTypeInfo(collectedType)
+
+ val funcCode = j"""
+ public class $className extends ${classOf[TableFunctionCollector[_]].getCanonicalName} {
+
+ ${reuseMemberCode()}
+
+ public $className() throws Exception {
+ ${reuseInitCode()}
+ }
+
+ @Override
+ public void collect(Object record) throws Exception {
+ super.collect(record);
+ $input1TypeClass $input1Term = ($input1TypeClass) getInput();
+ $input2TypeClass $input2Term = ($input2TypeClass) record;
+ ${reuseInputUnboxingCode()}
+ $bodyCode
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+ """.stripMargin
+
+ GeneratedCollector(className, funcCode)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index b7e1335..cf36417 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -33,7 +33,7 @@ import org.apache.flink.types.Row
import scala.collection.JavaConverters._
/**
- * Evaluates constant expressions using Flink's [[CodeGenerator]].
+ * Evaluates constant expressions using Flink's [[FunctionCodeGenerator]].
*/
class ExpressionReducer(config: TableConfig)
extends RelOptPlanner.Executor with Compiler[MapFunction[Row, Row]] {
@@ -77,7 +77,7 @@ class ExpressionReducer(config: TableConfig)
val resultType = new RowTypeInfo(literalTypes: _*)
// generate MapFunction
- val generator = new CodeGenerator(config, false, EMPTY_ROW_INFO)
+ val generator = new FunctionCodeGenerator(config, false, EMPTY_ROW_INFO)
val result = generator.generateResultExpression(
resultType,
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
new file mode 100644
index 0000000..e86c4ab
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen
+
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, newName}
+import org.apache.flink.table.codegen.Indenter.toISC
+
+/**
+ * A code generator for generating Flink [[org.apache.flink.api.common.functions.Function]]s.
+ * Including [[MapFunction]], [[FlatMapFunction]], [[FlatJoinFunction]], [[ProcessFunction]].
+ *
+ * @param config configuration that determines runtime behavior
+ * @param nullableInput input(s) can be null.
+ * @param input1 type information about the first input of the Function
+ * @param input2 type information about the second input if the Function is binary
+ * @param input1FieldMapping additional mapping information for input1
+ * (e.g. POJO types have no deterministic field order and some input fields might not be read)
+ * @param input2FieldMapping additional mapping information for input2
+ * (e.g. POJO types have no deterministic field order and some input fields might not be read)
+ */
+class FunctionCodeGenerator(
+ config: TableConfig,
+ nullableInput: Boolean,
+ input1: TypeInformation[_ <: Any],
+ input2: Option[TypeInformation[_ <: Any]] = None,
+ input1FieldMapping: Option[Array[Int]] = None,
+ input2FieldMapping: Option[Array[Int]] = None)
+ extends CodeGenerator(
+ config,
+ nullableInput,
+ input1,
+ input2,
+ input1FieldMapping,
+ input2FieldMapping) {
+
+ /**
+ * A code generator for generating unary Flink
+ * [[org.apache.flink.api.common.functions.Function]]s with one input.
+ *
+ * @param config configuration that determines runtime behavior
+ * @param nullableInput input(s) can be null.
+ * @param input type information about the input of the Function
+ * @param inputFieldMapping additional mapping information necessary for input
+ * (e.g. POJO types have no deterministic field order and some input fields might not be read)
+ */
+ def this(
+ config: TableConfig,
+ nullableInput: Boolean,
+ input: TypeInformation[Any],
+ inputFieldMapping: Array[Int]) =
+ this(config, nullableInput, input, None, Some(inputFieldMapping))
+
+ /**
+ * Generates a [[org.apache.flink.api.common.functions.Function]] that can be passed to Java
+ * compiler.
+ *
+ * @param name Class name of the Function. Must not be unique but has to be a valid Java class
+ * identifier.
+ * @param clazz Flink Function to be generated.
+ * @param bodyCode code contents of the SAM (Single Abstract Method). Inputs, collector, or
+ * output record can be accessed via the given term methods.
+ * @param returnType expected return type
+ * @tparam F Flink Function to be generated.
+ * @tparam T Return type of the Flink Function.
+ * @return instance of GeneratedFunction
+ */
+ def generateFunction[F <: Function, T <: Any](
+ name: String,
+ clazz: Class[F],
+ bodyCode: String,
+ returnType: TypeInformation[T])
+ : GeneratedFunction[F, T] = {
+ val funcName = newName(name)
+
+ // Janino does not support generics, that's why we need
+ // manual casting here
+ val samHeader =
+ // FlatMapFunction
+ if (clazz == classOf[FlatMapFunction[_, _]]) {
+ val baseClass = classOf[RichFlatMapFunction[_, _]]
+ val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
+ (baseClass,
+ s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)",
+ List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
+ }
+
+ // MapFunction
+ else if (clazz == classOf[MapFunction[_, _]]) {
+ val baseClass = classOf[RichMapFunction[_, _]]
+ val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
+ (baseClass,
+ "Object map(Object _in1)",
+ List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
+ }
+
+ // FlatJoinFunction
+ else if (clazz == classOf[FlatJoinFunction[_, _, _]]) {
+ val baseClass = classOf[RichFlatJoinFunction[_, _, _]]
+ val inputTypeTerm1 = boxedTypeTermForTypeInfo(input1)
+ val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.getOrElse(
+ throw new CodeGenException("Input 2 for FlatJoinFunction should not be null")))
+ (baseClass,
+ s"void join(Object _in1, Object _in2, org.apache.flink.util.Collector $collectorTerm)",
+ List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;",
+ s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;"))
+ }
+
+ // ProcessFunction
+ else if (clazz == classOf[ProcessFunction[_, _]]) {
+ val baseClass = classOf[ProcessFunction[_, _]]
+ val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
+ (baseClass,
+ s"void processElement(Object _in1, " +
+ s"org.apache.flink.streaming.api.functions.ProcessFunction.Context $contextTerm," +
+ s"org.apache.flink.util.Collector $collectorTerm)",
+ List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
+ }
+ else {
+ // TODO more functions
+ throw new CodeGenException("Unsupported Function.")
+ }
+
+ val funcCode = j"""
+ public class $funcName
+ extends ${samHeader._1.getCanonicalName} {
+
+ ${reuseMemberCode()}
+
+ public $funcName() throws Exception {
+ ${reuseInitCode()}
+ }
+
+ ${reuseConstructorCode(funcName)}
+
+ @Override
+ public void open(${classOf[Configuration].getCanonicalName} parameters) throws Exception {
+ ${reuseOpenCode()}
+ }
+
+ @Override
+ public ${samHeader._2} throws Exception {
+ ${samHeader._3.mkString("\n")}
+ ${reusePerRecordCode()}
+ ${reuseInputUnboxingCode()}
+ $bodyCode
+ }
+
+ @Override
+ public void close() throws Exception {
+ ${reuseCloseCode()}
+ }
+ }
+ """.stripMargin
+
+ GeneratedFunction(funcName, returnType, funcCode)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
new file mode 100644
index 0000000..6d6e1b6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen
+
+import org.apache.flink.api.common.io.GenericInputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.CodeGenUtils.newName
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.types.Row
+
+/**
+ * A code generator for generating Flink [[GenericInputFormat]]s.
+ *
+ * @param config configuration that determines runtime behavior
+ */
+class InputFormatCodeGenerator(
+ config: TableConfig)
+ extends CodeGenerator(config, false, new RowTypeInfo(), None, None) {
+
+
+ /**
+ * Generates a values input format that can be passed to Java compiler.
+ *
+ * @param name Class name of the input format. Must not be unique but has to be a
+ * valid Java class identifier.
+ * @param records code for creating records
+ * @param returnType expected return type
+ * @tparam T Return type of the Flink Function.
+ * @return instance of GeneratedFunction
+ */
+ def generateValuesInputFormat[T <: Row](
+ name: String,
+ records: Seq[String],
+ returnType: TypeInformation[T])
+ : GeneratedInput[GenericInputFormat[T], T] = {
+ val funcName = newName(name)
+
+ addReusableOutRecord(returnType)
+
+ val funcCode = j"""
+ public class $funcName extends ${classOf[GenericInputFormat[_]].getCanonicalName} {
+
+ private int nextIdx = 0;
+
+ ${reuseMemberCode()}
+
+ public $funcName() throws Exception {
+ ${reuseInitCode()}
+ }
+
+ @Override
+ public boolean reachedEnd() throws java.io.IOException {
+ return nextIdx >= ${records.length};
+ }
+
+ @Override
+ public Object nextRecord(Object reuse) {
+ switch (nextIdx) {
+ ${records.zipWithIndex.map { case (r, i) =>
+ s"""
+ |case $i:
+ | $r
+ |break;
+ """.stripMargin
+ }.mkString("\n")}
+ }
+ nextIdx++;
+ return $outRecordTerm;
+ }
+ }
+ """.stripMargin
+
+ GeneratedInput(funcName, returnType, funcCode)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index 9b486e4..693924e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rex._
import org.apache.flink.api.common.functions.Function
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
+import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.types.Row
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
trait CommonCalc {
private[flink] def generateFunction[T <: Function](
- generator: CodeGenerator,
+ generator: FunctionCodeGenerator,
ruleDescription: String,
inputSchema: RowSchema,
returnSchema: RowSchema,
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index 96e1f5e..96aaf3e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -20,12 +20,12 @@ package org.apache.flink.table.plan.nodes
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexShuttle}
import org.apache.calcite.sql.SemiJoinType
-import org.apache.flink.api.common.functions.{FlatMapFunction, Function}
+import org.apache.flink.api.common.functions.Function
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.{TableConfig, TableException}
import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue
import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE}
-import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, GeneratedExpression, GeneratedFunction}
+import org.apache.flink.table.codegen._
import org.apache.flink.table.functions.utils.TableSqlFunction
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.TableFunctionCollector
@@ -55,7 +55,7 @@ trait CommonCorrelate {
val physicalRexCall = inputSchema.mapRexNode(rexCall)
- val functionGenerator = new CodeGenerator(
+ val functionGenerator = new FunctionCodeGenerator(
config,
false,
inputSchema.physicalTypeInfo,
@@ -123,7 +123,7 @@ trait CommonCorrelate {
pojoFieldMapping: Option[Array[Int]])
: GeneratedCollector = {
- val generator = new CodeGenerator(
+ val generator = new CollectorCodeGenerator(
config,
false,
inputSchema.physicalTypeInfo,
@@ -155,7 +155,13 @@ trait CommonCorrelate {
// The generated expression is discarded.
generator.generateExpression(condition.get.accept(changeInputRefIndexShuttle))
- val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo, None, pojoFieldMapping)
+ val filterGenerator = new FunctionCodeGenerator(
+ config,
+ false,
+ udtfTypeInfo,
+ None,
+ pojoFieldMapping)
+
filterGenerator.input1Term = filterGenerator.input2Term
val filterCondition = filterGenerator.generateExpression(condition.get)
s"""
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
index 7ce73ee..996d62c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.nodes
import org.apache.flink.api.common.functions.Function
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
+import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
import org.apache.flink.types.Row
/**
@@ -48,7 +48,7 @@ trait CommonScan[T] {
inputFieldMapping: Option[Array[Int]] = None)
: GeneratedFunction[F, Row] = {
- val generator = new CodeGenerator(
+ val generator = new FunctionCodeGenerator(
config,
false,
inputType,
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
index b53081c..37d1a51 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.AggregationCodeGenerator
import org.apache.flink.table.plan.nodes.CommonAggregate
import org.apache.flink.table.runtime.aggregate.{AggregateUtil, DataSetPreAggFunction}
import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
@@ -95,7 +95,7 @@ class DataSetAggregate(
val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
- val generator = new CodeGenerator(
+ val generator = new AggregationCodeGenerator(
tableEnv.getConfig,
false,
inputDS.getType)
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index 9a9f738..a923acc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.FunctionCodeGenerator
import org.apache.flink.table.plan.nodes.CommonCalc
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.FlatMapRunner
@@ -86,7 +86,7 @@ class DataSetCalc(
val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val generator = new CodeGenerator(config, false, inputDS.getType)
+ val generator = new FunctionCodeGenerator(config, false, inputDS.getType)
val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index e6f8ca4..a6c31d3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.FunctionCodeGenerator
import org.apache.flink.table.runtime.FlatJoinRunner
import org.apache.flink.types.Row
@@ -158,7 +158,7 @@ class DataSetJoin(
throw TableException("Null check in TableConfig must be enabled for outer joins.")
}
- val generator = new CodeGenerator(
+ val generator = new FunctionCodeGenerator(
config,
nullCheck,
leftDataSet.getType,
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
index e92dec5..3c1c58b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
@@ -29,7 +29,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig}
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.FunctionCodeGenerator
import org.apache.flink.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner}
import org.apache.flink.types.Row
@@ -129,7 +129,7 @@ class DataSetSingleRowJoin(
case _ => false
}
- val codeGenerator = new CodeGenerator(
+ val codeGenerator = new FunctionCodeGenerator(
config,
isOuterJoin,
inputType1,
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
index 948dd27..3a4ba47 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
@@ -27,7 +27,7 @@ import org.apache.calcite.rex.RexLiteral
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.InputFormatCodeGenerator
import org.apache.flink.table.runtime.io.ValuesInputFormat
import org.apache.flink.types.Row
@@ -72,7 +72,7 @@ class DataSetValues(
val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
- val generator = new CodeGenerator(config)
+ val generator = new InputFormatCodeGenerator(config)
// generate code for every record
val generatedRecords = getTuples.asScala.map { r =>
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
index 3cb872a..38de368 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.AggregationCodeGenerator
import org.apache.flink.table.expressions.ExpressionUtils._
import org.apache.flink.table.plan.logical._
import org.apache.flink.table.plan.nodes.CommonAggregate
@@ -109,7 +109,7 @@ class DataSetWindowAggregate(
val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val generator = new CodeGenerator(
+ val generator = new AggregationCodeGenerator(
tableEnv.getConfig,
false,
inputDS.getType)
@@ -147,7 +147,7 @@ class DataSetWindowAggregate(
}
private def createEventTimeTumblingWindowDataSet(
- generator: CodeGenerator,
+ generator: AggregationCodeGenerator,
inputDS: DataSet[Row],
isTimeWindow: Boolean,
isParserCaseSensitive: Boolean): DataSet[Row] = {
@@ -210,7 +210,7 @@ class DataSetWindowAggregate(
}
private[this] def createEventTimeSessionWindowDataSet(
- generator: CodeGenerator,
+ generator: AggregationCodeGenerator,
inputDS: DataSet[Row],
isParserCaseSensitive: Boolean): DataSet[Row] = {
@@ -352,7 +352,7 @@ class DataSetWindowAggregate(
}
private def createEventTimeSlidingWindowDataSet(
- generator: CodeGenerator,
+ generator: AggregationCodeGenerator,
inputDS: DataSet[Row],
isTimeWindow: Boolean,
size: Long,
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index 67c5782..d626c46 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -27,7 +27,7 @@ import org.apache.calcite.rex.RexProgram
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.FunctionCodeGenerator
import org.apache.flink.table.plan.nodes.CommonCalc
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.CRowProcessRunner
@@ -93,7 +93,7 @@ class DataStreamCalc(
val inputDataStream =
getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
- val generator = new CodeGenerator(config, false, inputSchema.physicalTypeInfo)
+ val generator = new FunctionCodeGenerator(config, false, inputSchema.physicalTypeInfo)
val genFunction = generateFunction(
generator,
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index 33bb8cc..12694fc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -18,13 +18,12 @@
package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.flink.api.java.functions.NullByteKeySelector
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.AggregationCodeGenerator
import org.apache.flink.table.runtime.aggregate._
import org.apache.flink.table.plan.nodes.CommonAggregate
import org.apache.flink.table.plan.schema.RowSchema
@@ -121,7 +120,7 @@ class DataStreamGroupAggregate(
val outRowType = CRowTypeInfo(schema.physicalTypeInfo)
- val generator = new CodeGenerator(
+ val generator = new AggregationCodeGenerator(
tableEnv.getConfig,
false,
inputSchema.physicalTypeInfo)
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index d860cbe..c4ffdb1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -26,11 +26,11 @@ import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger
-import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window => DataStreamWindow}
+import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.AggregationCodeGenerator
import org.apache.flink.table.expressions.ExpressionUtils._
import org.apache.flink.table.plan.logical._
import org.apache.flink.table.plan.nodes.CommonAggregate
@@ -162,7 +162,7 @@ class DataStreamGroupWindowAggregate(
s"select: ($aggString)"
val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
- val generator = new CodeGenerator(
+ val generator = new AggregationCodeGenerator(
tableEnv.getConfig,
false,
inputSchema.physicalTypeInfo)
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index c03dac6..34a7fd8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -32,7 +32,7 @@ import org.apache.flink.table.plan.nodes.OverAggregate
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.aggregate._
import org.apache.flink.api.java.functions.NullByteKeySelector
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.AggregationCodeGenerator
import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -131,7 +131,7 @@ class DataStreamOverAggregate(
"excessive state size. You may specify a retention time of 0 to not clean up the state.")
}
- val generator = new CodeGenerator(
+ val generator = new AggregationCodeGenerator(
tableEnv.getConfig,
false,
inputSchema.physicalTypeInfo)
@@ -200,7 +200,7 @@ class DataStreamOverAggregate(
def createUnboundedAndCurrentRowOverWindow(
queryConfig: StreamQueryConfig,
- generator: CodeGenerator,
+ generator: AggregationCodeGenerator,
inputDS: DataStream[CRow],
isRowTimeType: Boolean,
isRowsClause: Boolean): DataStream[CRow] = {
@@ -252,7 +252,7 @@ class DataStreamOverAggregate(
def createBoundedAndCurrentRowOverWindow(
queryConfig: StreamQueryConfig,
- generator: CodeGenerator,
+ generator: AggregationCodeGenerator,
inputDS: DataStream[CRow],
isRowTimeType: Boolean,
isRowsClause: Boolean): DataStream[CRow] = {
http://git-wip-us.apache.org/repos/asf/flink/blob/527e7499/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
index d7c490f..1476681 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.rel.core.Values
import org.apache.calcite.rex.RexLiteral
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.InputFormatCodeGenerator
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.io.CRowValuesInputFormat
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -63,7 +63,7 @@ class DataStreamValues(
val config = tableEnv.getConfig
val returnType = CRowTypeInfo(schema.physicalTypeInfo)
- val generator = new CodeGenerator(config)
+ val generator = new InputFormatCodeGenerator(config)
// generate code for every record
val generatedRecords = getTuples.asScala.map { r =>