You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/05/09 09:37:03 UTC
[flink] branch master updated: [FLINK-12453][table-planner-blink]
Simplify constructor of AggsHandlerCodeGenerator to explicitly tell which
methods need to be generated
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 504e7be [FLINK-12453][table-planner-blink] Simplify constructor of AggsHandlerCodeGenerator to explicitly tell which methods need to be generated
504e7be is described below
commit 504e7be7dcb2cad4184f9ffd1ef20d8b13f64f40
Author: Jark Wu <im...@gmail.com>
AuthorDate: Thu May 9 14:16:50 2019 +0800
[FLINK-12453][table-planner-blink] Simplify constructor of AggsHandlerCodeGenerator to explicitly tell which methods need to be generated
This closes #8378
---
.../codegen/agg/AggsHandlerCodeGenerator.scala | 45 ++++++++++++++++------
.../physical/batch/BatchExecOverAggregate.scala | 20 ++++++----
.../stream/StreamExecGlobalGroupAggregate.scala | 5 +--
.../physical/stream/StreamExecGroupAggregate.scala | 9 ++++-
.../StreamExecIncrementalGroupAggregate.scala | 5 +--
.../stream/StreamExecLocalGroupAggregate.scala | 10 ++++-
.../codegen/agg/AggsHandlerCodeGeneratorTest.scala | 11 ++++--
.../table/runtime/stream/sql/AggregateITCase.scala | 5 +--
8 files changed, 74 insertions(+), 36 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala
index 9a66c9e..e5a9b7b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala
@@ -45,9 +45,7 @@ class AggsHandlerCodeGenerator(
ctx: CodeGeneratorContext,
relBuilder: RelBuilder,
inputFieldTypes: Seq[InternalType],
- needRetract: Boolean,
- copyInputField: Boolean,
- needAccumulate: Boolean = true) {
+ copyInputField: Boolean) {
private val inputType = new RowType(inputFieldTypes: _*)
@@ -69,7 +67,9 @@ class AggsHandlerCodeGenerator(
private var ignoreAggValues: Array[Int] = Array()
- private var needMerge = false
+ private var isAccumulateNeeded = false
+ private var isRetractNeeded = false
+ private var isMergeNeeded = false
var valueType: RowType = _
@@ -132,22 +132,45 @@ class AggsHandlerCodeGenerator(
this
}
+
+ /**
+ * Tells the generator to generate `accumulate(..)` method for the [[AggsHandleFunction]] and
+ * [[NamespaceAggsHandleFunction]]. Default not generate `accumulate(..)` method.
+ */
+ def needAccumulate(): AggsHandlerCodeGenerator = {
+ this.isAccumulateNeeded = true
+ this
+ }
+
+ /**
+ * Tells the generator to generate `retract(..)` method for the [[AggsHandleFunction]] and
+ * [[NamespaceAggsHandleFunction]]. Default not generate `retract(..)` method.
+ *
+ * @return
+ */
+ def needRetract(): AggsHandlerCodeGenerator = {
+ this.isRetractNeeded = true
+ this
+ }
+
/**
- * Sets merged accumulator information.
+ * Tells the generator to generate `merge(..)` method with the merged accumulator information
+ * for the [[AggsHandleFunction]] and [[NamespaceAggsHandleFunction]].
+ * Default not generate `merge(..)` method.
*
* @param mergedAccOffset the mergedAcc may come from local aggregate,
* this is the first buffer offset in the row
* @param mergedAccOnHeap true if the mergedAcc is on heap, otherwise
* @param mergedAccExternalTypes the merged acc types
*/
- def withMerging(
+ def needMerge(
mergedAccOffset: Int,
mergedAccOnHeap: Boolean,
mergedAccExternalTypes: Array[TypeInformation[_]] = null): AggsHandlerCodeGenerator = {
this.mergedAccOffset = mergedAccOffset
this.mergedAccOnHeap = mergedAccOnHeap
this.mergedAccExternalTypes = mergedAccExternalTypes
- this.needMerge = true
+ this.isMergeNeeded = true
this
}
@@ -230,7 +253,7 @@ class AggsHandlerCodeGenerator(
aggBufferOffset,
aggBufferSize,
hasNamespace,
- needMerge,
+ isMergeNeeded,
mergedAccOnHeap,
distinctInfo.consumeRetraction,
copyInputField,
@@ -540,7 +563,7 @@ class AggsHandlerCodeGenerator(
}
private def genAccumulate(): String = {
- if (needAccumulate) {
+ if (isAccumulateNeeded) {
// validation check
checkNeededMethods(needAccumulate = true)
@@ -563,7 +586,7 @@ class AggsHandlerCodeGenerator(
}
private def genRetract(): String = {
- if (needRetract) {
+ if (isRetractNeeded) {
// validation check
checkNeededMethods(needRetract = true)
@@ -586,7 +609,7 @@ class AggsHandlerCodeGenerator(
}
private def genMerge(): String = {
- if (needMerge) {
+ if (isMergeNeeded) {
// validation check
checkNeededMethods(needMerge = true)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
index 0fe3eff..fd48fc1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
@@ -286,11 +286,12 @@ class BatchExecOverAggregate(
codeGenCtx,
relBuilder,
inputType.getFieldTypes,
- needRetract = false,
copyInputField = false)
// over agg code gen must pass the constants
- generator.withConstants(constants).generateAggsHandler(
- "BoundedOverAggregateHelper", aggInfoList)
+ generator
+ .needAccumulate()
+ .withConstants(constants)
+ .generateAggsHandler("BoundedOverAggregateHelper", aggInfoList)
}.toArray
val resetAccumulators =
@@ -344,12 +345,14 @@ class BatchExecOverAggregate(
CodeGeneratorContext(config),
relBuilder,
inputType.getFieldTypes,
- needRetract = true,
copyInputField = false)
// over agg code gen must pass the constants
- val genAggsHandler = generator.withConstants(constants)
- .generateAggsHandler("BoundedOverAggregateHelper", aggInfoList)
+ val genAggsHandler = generator
+ .needAccumulate()
+ .needRetract()
+ .withConstants(constants)
+ .generateAggsHandler("BoundedOverAggregateHelper", aggInfoList)
// LEAD is behind the currentRow, so we need plus offset.
// LAG is in front of the currentRow, so we need minus offset.
@@ -414,11 +417,12 @@ class BatchExecOverAggregate(
codeGenCtx,
relBuilder,
inputType.getFieldTypes,
- needRetract = false,
copyInputField = false)
// over agg code gen must pass the constants
- val genAggsHandler = generator.withConstants(constants)
+ val genAggsHandler = generator
+ .needAccumulate()
+ .withConstants(constants)
.generateAggsHandler("BoundedOverAggregateHelper", aggInfoList)
mode match {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
index e7f879d..a63b4e0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
@@ -212,12 +212,11 @@ class StreamExecGlobalGroupAggregate(
CodeGeneratorContext(config),
relBuilder,
FlinkTypeFactory.toInternalRowType(inputRowType).getFieldTypes,
- needRetract = false,
- config.getNullCheck,
inputFieldCopy)
generator
- .withMerging(mergedAccOffset, mergedAccOnHeap, mergedAccExternalTypes)
+ .needAccumulate()
+ .needMerge(mergedAccOffset, mergedAccOnHeap, mergedAccExternalTypes)
.generateAggsHandler(name, aggInfoList)
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
index 8947998..863e92a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
@@ -146,13 +146,18 @@ class StreamExecGroupAggregate(
CodeGeneratorContext(tableConfig),
tableEnv.getRelBuilder,
inputRowType.getFieldTypes,
- needRetraction,
// TODO: heap state backend do not copy key currently, we have to copy input field
// TODO: copy is not need when state backend is rocksdb, improve this in future
// TODO: but other operators do not copy this input field.....
copyInputField = true)
- val aggsHandler = generator.generateAggsHandler("GroupAggsHandler", aggInfoList)
+ if (needRetraction) {
+ generator.needRetract()
+ }
+
+ val aggsHandler = generator
+ .needAccumulate()
+ .generateAggsHandler("GroupAggsHandler", aggInfoList)
val accTypes = aggInfoList.getAccTypes.map(createInternalTypeFromTypeInfo)
val aggValueTypes = aggInfoList.getActualValueTypes.map(createInternalTypeFromTypeInfo)
val recordEqualiser = new EqualiserCodeGenerator(aggValueTypes)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
index 868be32..6cd1a15 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
@@ -208,12 +208,11 @@ class StreamExecIncrementalGroupAggregate(
CodeGeneratorContext(config),
relBuilder,
FlinkTypeFactory.toInternalRowType(inputRowType).getFieldTypes,
- needRetract = false,
- config.getNullCheck,
inputFieldCopy)
generator
- .withMerging(mergedAccOffset, mergedAccOnHeap = true, mergedAccExternalTypes)
+ .needAccumulate()
+ .needMerge(mergedAccOffset, mergedAccOnHeap = true, mergedAccExternalTypes)
.generateAggsHandler(name, aggInfoList)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala
index 97d0288..79b7551 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala
@@ -120,10 +120,16 @@ class StreamExecLocalGroupAggregate(
CodeGeneratorContext(tableEnv.getConfig),
tableEnv.getRelBuilder,
inRowType.getFieldTypes,
- needRetraction,
// the local aggregate result will be buffered, so need copy
copyInputField = true)
- generator.withMerging(mergedAccOffset = 0, mergedAccOnHeap = true)
+
+ generator
+ .needAccumulate()
+ .needMerge(mergedAccOffset = 0, mergedAccOnHeap = true)
+
+ if (needRetraction) {
+ generator.needRetract()
+ }
val aggsHandler = generator.generateAggsHandler("GroupAggsHandler", aggInfoList)
val aggFunction = new MiniBatchLocalGroupAggFunction(aggsHandler)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGeneratorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGeneratorTest.scala
index abc62e3..d9511a4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGeneratorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGeneratorTest.scala
@@ -82,12 +82,17 @@ class AggsHandlerCodeGeneratorTest extends AggTestBase {
}
private def getHandler(needRetract: Boolean, needMerge: Boolean): AggsHandleFunction = {
- val generator = new AggsHandlerCodeGenerator(ctx, relBuilder, inputTypes, needRetract, true)
+ val generator = new AggsHandlerCodeGenerator(ctx, relBuilder, inputTypes, true)
+ if (needRetract) {
+ generator.needRetract()
+ }
if (needMerge) {
- generator.withMerging(1, mergedAccOnHeap = true, Array(Types.LONG, Types.LONG,
+ generator.needMerge(1, mergedAccOnHeap = true, Array(Types.LONG, Types.LONG,
Types.DOUBLE, Types.LONG, imperativeAggFunc.getAccumulatorType))
}
- val handler = generator.generateAggsHandler("Test", aggInfoList).newInstance(classLoader)
+ val handler = generator
+ .needAccumulate()
+ .generateAggsHandler("Test", aggInfoList).newInstance(classLoader)
handler.open(new PerKeyStateDataViewStore(context.getRuntimeContext))
handler
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala
index c536820..160680b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala
@@ -24,7 +24,6 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.dataformat.Decimal
import org.apache.flink.table.functions.aggfunctions.{ConcatWithRetractAggFunction, ConcatWsWithRetractAggFunction}
import org.apache.flink.table.plan.util.JavaUserDefinedAggFunctions.VarSumAggFunction
import org.apache.flink.table.runtime.batch.sql.agg.{MyPojoAggFunction, VarArgsAggFunction}
@@ -33,7 +32,7 @@ import org.apache.flink.table.runtime.utils.StreamingWithMiniBatchTestBase.MiniB
import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils._
import org.apache.flink.table.runtime.utils.{StreamTestData, StreamingWithAggTestBase, TestingRetractSink}
-import org.apache.flink.table.typeutils.{BigDecimalTypeInfo, DecimalTypeInfo}
+import org.apache.flink.table.typeutils.BigDecimalTypeInfo
import org.apache.flink.table.util.DateTimeTestUtil._
import org.apache.flink.types.Row
import org.junit.Assert.assertEquals
@@ -741,7 +740,6 @@ class AggregateITCase(
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}
- @Ignore("[FLINK-12208] LIMIT is not supported")
@Test
def testDifferentTypesSumWithRetract(): Unit = {
val data = List(
@@ -1203,7 +1201,6 @@ class AggregateITCase(
@Test
def testCountDistinctWithBinaryRowSource(): Unit = {
- System.setProperty("org.codehaus.janino.source_debugging.enable", "true")
// this case is failed before, because of object reuse problem
val data = (0 until 100).map {i => ("1", "1", s"${i%50}", "1")}.toList
// use BinaryRow source here for BinaryString reuse