You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/05/09 09:37:03 UTC

[flink] branch master updated: [FLINK-12453][table-planner-blink] Simplify constructor of AggsHandlerCodeGenerator to explicitly tell which methods need to be generated

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 504e7be  [FLINK-12453][table-planner-blink] Simplify constructor of AggsHandlerCodeGenerator to explicitly tell which methods need to be generated
504e7be is described below

commit 504e7be7dcb2cad4184f9ffd1ef20d8b13f64f40
Author: Jark Wu <im...@gmail.com>
AuthorDate: Thu May 9 14:16:50 2019 +0800

    [FLINK-12453][table-planner-blink] Simplify constructor of AggsHandlerCodeGenerator to explicitly tell which methods need to be generated
    
    This closes #8378
---
 .../codegen/agg/AggsHandlerCodeGenerator.scala     | 45 ++++++++++++++++------
 .../physical/batch/BatchExecOverAggregate.scala    | 20 ++++++----
 .../stream/StreamExecGlobalGroupAggregate.scala    |  5 +--
 .../physical/stream/StreamExecGroupAggregate.scala |  9 ++++-
 .../StreamExecIncrementalGroupAggregate.scala      |  5 +--
 .../stream/StreamExecLocalGroupAggregate.scala     | 10 ++++-
 .../codegen/agg/AggsHandlerCodeGeneratorTest.scala | 11 ++++--
 .../table/runtime/stream/sql/AggregateITCase.scala |  5 +--
 8 files changed, 74 insertions(+), 36 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala
index 9a66c9e..e5a9b7b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala
@@ -45,9 +45,7 @@ class AggsHandlerCodeGenerator(
     ctx: CodeGeneratorContext,
     relBuilder: RelBuilder,
     inputFieldTypes: Seq[InternalType],
-    needRetract: Boolean,
-    copyInputField: Boolean,
-    needAccumulate: Boolean = true) {
+    copyInputField: Boolean) {
 
   private val inputType = new RowType(inputFieldTypes: _*)
 
@@ -69,7 +67,9 @@ class AggsHandlerCodeGenerator(
 
   private var ignoreAggValues: Array[Int] = Array()
 
-  private var needMerge = false
+  private var isAccumulateNeeded = false
+  private var isRetractNeeded = false
+  private var isMergeNeeded = false
 
   var valueType: RowType = _
 
@@ -132,22 +132,45 @@ class AggsHandlerCodeGenerator(
     this
   }
 
+
+  /**
+    * Tells the generator to generate `accumulate(..)` method for the [[AggsHandleFunction]] and
+    * [[NamespaceAggsHandleFunction]]. Default not generate `accumulate(..)` method.
+    */
+  def needAccumulate(): AggsHandlerCodeGenerator = {
+    this.isAccumulateNeeded = true
+    this
+  }
+
+  /**
+    * Tells the generator to generate `retract(..)` method for the [[AggsHandleFunction]] and
+    * [[NamespaceAggsHandleFunction]]. Default not generate `retract(..)` method.
+    *
+    * @return
+    */
+  def needRetract(): AggsHandlerCodeGenerator = {
+    this.isRetractNeeded = true
+    this
+  }
+
   /**
-    * Sets merged accumulator information.
+    * Tells the generator to generate `merge(..)` method with the merged accumulator information
+    * for the [[AggsHandleFunction]] and [[NamespaceAggsHandleFunction]].
+    * Default not generate `merge(..)` method.
     *
     * @param mergedAccOffset the mergedAcc may come from local aggregate,
     *                         this is the first buffer offset in the row
     * @param mergedAccOnHeap true if the mergedAcc is on heap, otherwise
     * @param mergedAccExternalTypes the merged acc types
     */
-  def withMerging(
+  def needMerge(
       mergedAccOffset: Int,
       mergedAccOnHeap: Boolean,
       mergedAccExternalTypes: Array[TypeInformation[_]] = null): AggsHandlerCodeGenerator = {
     this.mergedAccOffset = mergedAccOffset
     this.mergedAccOnHeap = mergedAccOnHeap
     this.mergedAccExternalTypes = mergedAccExternalTypes
-    this.needMerge = true
+    this.isMergeNeeded = true
     this
   }
 
@@ -230,7 +253,7 @@ class AggsHandlerCodeGenerator(
           aggBufferOffset,
           aggBufferSize,
           hasNamespace,
-          needMerge,
+          isMergeNeeded,
           mergedAccOnHeap,
           distinctInfo.consumeRetraction,
           copyInputField,
@@ -540,7 +563,7 @@ class AggsHandlerCodeGenerator(
   }
 
   private def genAccumulate(): String = {
-    if (needAccumulate) {
+    if (isAccumulateNeeded) {
       // validation check
       checkNeededMethods(needAccumulate = true)
 
@@ -563,7 +586,7 @@ class AggsHandlerCodeGenerator(
   }
 
   private def genRetract(): String = {
-    if (needRetract) {
+    if (isRetractNeeded) {
       // validation check
       checkNeededMethods(needRetract = true)
 
@@ -586,7 +609,7 @@ class AggsHandlerCodeGenerator(
   }
 
   private def genMerge(): String = {
-    if (needMerge) {
+    if (isMergeNeeded) {
       // validation check
       checkNeededMethods(needMerge = true)
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
index 0fe3eff..fd48fc1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
@@ -286,11 +286,12 @@ class BatchExecOverAggregate(
           codeGenCtx,
           relBuilder,
           inputType.getFieldTypes,
-          needRetract = false,
           copyInputField = false)
         // over agg code gen must pass the constants
-        generator.withConstants(constants).generateAggsHandler(
-          "BoundedOverAggregateHelper", aggInfoList)
+        generator
+          .needAccumulate()
+          .withConstants(constants)
+          .generateAggsHandler("BoundedOverAggregateHelper", aggInfoList)
       }.toArray
 
       val resetAccumulators =
@@ -344,12 +345,14 @@ class BatchExecOverAggregate(
               CodeGeneratorContext(config),
               relBuilder,
               inputType.getFieldTypes,
-              needRetract = true,
               copyInputField = false)
 
             // over agg code gen must pass the constants
-            val genAggsHandler = generator.withConstants(constants)
-                .generateAggsHandler("BoundedOverAggregateHelper", aggInfoList)
+            val genAggsHandler = generator
+              .needAccumulate()
+              .needRetract()
+              .withConstants(constants)
+              .generateAggsHandler("BoundedOverAggregateHelper", aggInfoList)
 
             // LEAD is behind the currentRow, so we need plus offset.
             // LAG is in front of the currentRow, so we need minus offset.
@@ -414,11 +417,12 @@ class BatchExecOverAggregate(
             codeGenCtx,
             relBuilder,
             inputType.getFieldTypes,
-            needRetract = false,
             copyInputField = false)
 
           // over agg code gen must pass the constants
-          val genAggsHandler = generator.withConstants(constants)
+          val genAggsHandler = generator
+              .needAccumulate()
+              .withConstants(constants)
               .generateAggsHandler("BoundedOverAggregateHelper", aggInfoList)
 
           mode match {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
index e7f879d..a63b4e0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
@@ -212,12 +212,11 @@ class StreamExecGlobalGroupAggregate(
       CodeGeneratorContext(config),
       relBuilder,
       FlinkTypeFactory.toInternalRowType(inputRowType).getFieldTypes,
-      needRetract = false,
-      config.getNullCheck,
       inputFieldCopy)
 
     generator
-      .withMerging(mergedAccOffset, mergedAccOnHeap, mergedAccExternalTypes)
+      .needAccumulate()
+      .needMerge(mergedAccOffset, mergedAccOnHeap, mergedAccExternalTypes)
       .generateAggsHandler(name, aggInfoList)
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
index 8947998..863e92a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
@@ -146,13 +146,18 @@ class StreamExecGroupAggregate(
       CodeGeneratorContext(tableConfig),
       tableEnv.getRelBuilder,
       inputRowType.getFieldTypes,
-      needRetraction,
       // TODO: heap state backend do not copy key currently, we have to copy input field
       // TODO: copy is not need when state backend is rocksdb, improve this in future
       // TODO: but other operators do not copy this input field.....
       copyInputField = true)
 
-    val aggsHandler = generator.generateAggsHandler("GroupAggsHandler", aggInfoList)
+    if (needRetraction) {
+      generator.needRetract()
+    }
+
+    val aggsHandler = generator
+      .needAccumulate()
+      .generateAggsHandler("GroupAggsHandler", aggInfoList)
     val accTypes = aggInfoList.getAccTypes.map(createInternalTypeFromTypeInfo)
     val aggValueTypes = aggInfoList.getActualValueTypes.map(createInternalTypeFromTypeInfo)
     val recordEqualiser = new EqualiserCodeGenerator(aggValueTypes)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
index 868be32..6cd1a15 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
@@ -208,12 +208,11 @@ class StreamExecIncrementalGroupAggregate(
       CodeGeneratorContext(config),
       relBuilder,
       FlinkTypeFactory.toInternalRowType(inputRowType).getFieldTypes,
-      needRetract = false,
-      config.getNullCheck,
       inputFieldCopy)
 
     generator
-      .withMerging(mergedAccOffset, mergedAccOnHeap = true, mergedAccExternalTypes)
+      .needAccumulate()
+      .needMerge(mergedAccOffset, mergedAccOnHeap = true, mergedAccExternalTypes)
       .generateAggsHandler(name, aggInfoList)
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala
index 97d0288..79b7551 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala
@@ -120,10 +120,16 @@ class StreamExecLocalGroupAggregate(
       CodeGeneratorContext(tableEnv.getConfig),
       tableEnv.getRelBuilder,
       inRowType.getFieldTypes,
-      needRetraction,
       // the local aggregate result will be buffered, so need copy
       copyInputField = true)
-    generator.withMerging(mergedAccOffset = 0, mergedAccOnHeap = true)
+
+    generator
+      .needAccumulate()
+      .needMerge(mergedAccOffset = 0, mergedAccOnHeap = true)
+
+    if (needRetraction) {
+      generator.needRetract()
+    }
 
     val aggsHandler = generator.generateAggsHandler("GroupAggsHandler", aggInfoList)
     val aggFunction = new MiniBatchLocalGroupAggFunction(aggsHandler)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGeneratorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGeneratorTest.scala
index abc62e3..d9511a4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGeneratorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGeneratorTest.scala
@@ -82,12 +82,17 @@ class AggsHandlerCodeGeneratorTest extends AggTestBase {
   }
 
   private def getHandler(needRetract: Boolean, needMerge: Boolean): AggsHandleFunction = {
-    val generator = new AggsHandlerCodeGenerator(ctx, relBuilder, inputTypes, needRetract, true)
+    val generator = new AggsHandlerCodeGenerator(ctx, relBuilder, inputTypes, true)
+    if (needRetract) {
+      generator.needRetract()
+    }
     if (needMerge) {
-      generator.withMerging(1, mergedAccOnHeap = true, Array(Types.LONG, Types.LONG,
+      generator.needMerge(1, mergedAccOnHeap = true, Array(Types.LONG, Types.LONG,
         Types.DOUBLE, Types.LONG, imperativeAggFunc.getAccumulatorType))
     }
-    val handler = generator.generateAggsHandler("Test", aggInfoList).newInstance(classLoader)
+    val handler = generator
+      .needAccumulate()
+      .generateAggsHandler("Test", aggInfoList).newInstance(classLoader)
     handler.open(new PerKeyStateDataViewStore(context.getRuntimeContext))
     handler
   }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala
index c536820..160680b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AggregateITCase.scala
@@ -24,7 +24,6 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.dataformat.Decimal
 import org.apache.flink.table.functions.aggfunctions.{ConcatWithRetractAggFunction, ConcatWsWithRetractAggFunction}
 import org.apache.flink.table.plan.util.JavaUserDefinedAggFunctions.VarSumAggFunction
 import org.apache.flink.table.runtime.batch.sql.agg.{MyPojoAggFunction, VarArgsAggFunction}
@@ -33,7 +32,7 @@ import org.apache.flink.table.runtime.utils.StreamingWithMiniBatchTestBase.MiniB
 import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
 import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils._
 import org.apache.flink.table.runtime.utils.{StreamTestData, StreamingWithAggTestBase, TestingRetractSink}
-import org.apache.flink.table.typeutils.{BigDecimalTypeInfo, DecimalTypeInfo}
+import org.apache.flink.table.typeutils.BigDecimalTypeInfo
 import org.apache.flink.table.util.DateTimeTestUtil._
 import org.apache.flink.types.Row
 import org.junit.Assert.assertEquals
@@ -741,7 +740,6 @@ class AggregateITCase(
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
-  @Ignore("[FLINK-12208] LIMIT is not supported")
   @Test
   def testDifferentTypesSumWithRetract(): Unit = {
     val data = List(
@@ -1203,7 +1201,6 @@ class AggregateITCase(
 
   @Test
   def testCountDistinctWithBinaryRowSource(): Unit = {
-    System.setProperty("org.codehaus.janino.source_debugging.enable", "true")
     // this case is failed before, because of object reuse problem
     val data = (0 until 100).map {i => ("1", "1", s"${i%50}", "1")}.toList
     // use BinaryRow source here for BinaryString reuse