You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/31 13:14:53 UTC
flink git commit: [FLINK-5915] [table] Forward the complete aggregate
ArgList to aggregate runtime functions.
Repository: flink
Updated Branches:
refs/heads/master 48890285d -> 91c90c5d2
[FLINK-5915] [table] Forward the complete aggregate ArgList to aggregate runtime functions.
This closes #3647.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91c90c5d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91c90c5d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91c90c5d
Branch: refs/heads/master
Commit: 91c90c5d2d38c286e128777aa7b8e14fc5321575
Parents: 4889028
Author: shaoxuan-wang <ws...@gmail.com>
Authored: Fri Mar 31 17:07:09 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 31 15:14:37 2017 +0200
----------------------------------------------------------------------
.../table/runtime/aggregate/AggregateAggFunction.scala | 4 ++--
.../flink/table/runtime/aggregate/AggregateUtil.scala | 12 +++++++-----
.../BoundedProcessingOverRowProcessFunction.scala | 6 +++---
.../table/runtime/aggregate/DataSetAggFunction.scala | 4 ++--
.../table/runtime/aggregate/DataSetPreAggFunction.scala | 4 ++--
.../runtime/aggregate/DataSetWindowAggMapFunction.scala | 4 ++--
.../RangeClauseBoundedOverProcessFunction.scala | 6 +++---
.../RowsClauseBoundedOverProcessFunction.scala | 6 +++---
.../UnboundedEventTimeOverProcessFunction.scala | 10 +++++-----
...dedNonPartitionedProcessingOverProcessFunction.scala | 4 ++--
.../UnboundedProcessingOverProcessFunction.scala | 4 ++--
11 files changed, 33 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
index 11d55e5..c608b97 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
@@ -33,7 +33,7 @@ import org.apache.flink.types.Row
*/
class AggregateAggFunction(
private val aggregates: Array[AggregateFunction[_]],
- private val aggFields: Array[Int])
+ private val aggFields: Array[Array[Int]])
extends DataStreamAggFunc[Row, Row, Row] {
override def createAccumulator(): Row = {
@@ -51,7 +51,7 @@ class AggregateAggFunction(
var i = 0
while (i < aggregates.length) {
val acc = accumulatorRow.getField(i).asInstanceOf[Accumulator]
- val v = value.getField(aggFields(i))
+ val v = value.getField(aggFields(i)(0))
aggregates(i).accumulate(acc, v)
i += 1
}
http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 88e9d68..74dc5cd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -45,6 +45,7 @@ import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeIn
import org.apache.flink.types.Row
import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
object AggregateUtil {
@@ -886,10 +887,10 @@ object AggregateUtil {
aggregateCalls: Seq[AggregateCall],
inputType: RelDataType,
needRetraction: Boolean)
- : (Array[Int], Array[TableAggregateFunction[_ <: Any]]) = {
+ : (Array[Array[Int]], Array[TableAggregateFunction[_ <: Any]]) = {
// store the aggregate fields of each aggregate function, by the same order of aggregates.
- val aggFieldIndexes = new Array[Int](aggregateCalls.size)
+ val aggFieldIndexes = new Array[Array[Int]](aggregateCalls.size)
val aggregates = new Array[TableAggregateFunction[_ <: Any]](aggregateCalls.size)
// create aggregate function instances by function type and aggregate field data type.
@@ -897,7 +898,7 @@ object AggregateUtil {
val argList: util.List[Integer] = aggregateCall.getArgList
if (argList.isEmpty) {
if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
- aggFieldIndexes(index) = 0
+ aggFieldIndexes(index) = Array[Int](0)
} else {
throw new TableException("Aggregate fields should not be empty.")
}
@@ -905,9 +906,10 @@ object AggregateUtil {
if (argList.size() > 1) {
throw new TableException("Currently, do not support aggregate on multi fields.")
}
- aggFieldIndexes(index) = argList.get(0)
+ aggFieldIndexes(index) = argList.asScala.map(i => i.intValue).toArray
}
- val sqlTypeName = inputType.getFieldList.get(aggFieldIndexes(index)).getType.getSqlTypeName
+ val sqlTypeName = inputType.getFieldList.get(aggFieldIndexes(index)(0)).getType
+ .getSqlTypeName
aggregateCall.getAggregation match {
case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction =>
http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala
index 454b177..1b990ec 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala
@@ -37,7 +37,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo
class BoundedProcessingOverRowProcessFunction(
private val aggregates: Array[AggregateFunction[_]],
- private val aggFields: Array[Int],
+ private val aggFields: Array[Array[Int]],
private val precedingOffset: Long,
private val forwardedFieldCount: Int,
private val aggregatesTypeInfo: RowTypeInfo,
@@ -118,7 +118,7 @@ class BoundedProcessingOverRowProcessFunction(
i = 0
while (i < aggregates.length) {
val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
- aggregates(i).retract(accumulator, retractList.get(0).getField(aggFields(i)))
+ aggregates(i).retract(accumulator, retractList.get(0).getField(aggFields(i)(0)))
i += 1
}
retractList.remove(0)
@@ -157,7 +157,7 @@ class BoundedProcessingOverRowProcessFunction(
while (i < aggregates.length) {
val index = forwardedFieldCount + i
val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
- aggregates(i).accumulate(accumulator, input.getField(aggFields(i)))
+ aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
output.setField(index, aggregates(i).getValue(accumulator))
i += 1
}
http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
index 5babcf2..867943e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
@@ -38,7 +38,7 @@ import org.apache.flink.util.{Collector, Preconditions}
*/
class DataSetAggFunction(
private val aggregates: Array[AggregateFunction[_ <: Any]],
- private val aggInFields: Array[Int],
+ private val aggInFields: Array[Array[Int]],
private val aggOutMapping: Array[(Int, Int)],
private val gkeyOutMapping: Array[(Int, Int)],
private val groupingSetsMapping: Array[(Int, Int)],
@@ -82,7 +82,7 @@ class DataSetAggFunction(
// accumulate
i = 0
while (i < aggregates.length) {
- aggregates(i).accumulate(accumulators(i), record.getField(aggInFields(i)))
+ aggregates(i).accumulate(accumulators(i), record.getField(aggInFields(i)(0)))
i += 1
}
http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
index c18dcdc..db49a53 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
@@ -35,7 +35,7 @@ import org.apache.flink.util.{Collector, Preconditions}
*/
class DataSetPreAggFunction(
private val aggregates: Array[AggregateFunction[_ <: Any]],
- private val aggInFields: Array[Int],
+ private val aggInFields: Array[Array[Int]],
private val groupingKeys: Array[Int])
extends AbstractRichFunction
with GroupCombineFunction[Row, Row]
@@ -78,7 +78,7 @@ class DataSetPreAggFunction(
// accumulate
i = 0
while (i < aggregates.length) {
- aggregates(i).accumulate(accumulators(i), record.getField(aggInFields(i)))
+ aggregates(i).accumulate(accumulators(i), record.getField(aggInFields(i)(0)))
i += 1
}
// check if this record is the last record
http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
index f7f3387..5cc7ada 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
@@ -36,7 +36,7 @@ import org.apache.flink.util.Preconditions
*/
class DataSetWindowAggMapFunction(
private val aggregates: Array[AggregateFunction[_]],
- private val aggFields: Array[Int],
+ private val aggFields: Array[Array[Int]],
private val groupingKeys: Array[Int],
private val timeFieldPos: Int, // time field position in input row
private val tumbleTimeWindowSize: Option[Long],
@@ -62,7 +62,7 @@ class DataSetWindowAggMapFunction(
var i = 0
while (i < aggregates.length) {
val agg = aggregates(i)
- val fieldValue = input.getField(aggFields(i))
+ val fieldValue = input.getField(aggFields(i)(0))
val accumulator = agg.createAccumulator()
agg.accumulate(accumulator, fieldValue)
output.setField(groupingKeys.length + i, accumulator)
http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
index 0c8555b..9a57c56 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala
@@ -40,7 +40,7 @@ import org.apache.flink.util.{Collector, Preconditions}
*/
class RangeClauseBoundedOverProcessFunction(
private val aggregates: Array[AggregateFunction[_]],
- private val aggFields: Array[Int],
+ private val aggFields: Array[Array[Int]],
private val forwardedFieldCount: Int,
private val aggregationStateType: RowTypeInfo,
private val inputRowType: RowTypeInfo,
@@ -160,7 +160,7 @@ class RangeClauseBoundedOverProcessFunction(
val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator]
aggregates(aggregatesIndex)
.retract(accumulator, retractDataList.get(dataListIndex)
- .getField(aggFields(aggregatesIndex)))
+ .getField(aggFields(aggregatesIndex)(0)))
aggregatesIndex += 1
}
dataListIndex += 1
@@ -177,7 +177,7 @@ class RangeClauseBoundedOverProcessFunction(
while (aggregatesIndex < aggregates.length) {
val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator]
aggregates(aggregatesIndex).accumulate(accumulator, inputs.get(dataListIndex)
- .getField(aggFields(aggregatesIndex)))
+ .getField(aggFields(aggregatesIndex)(0)))
aggregatesIndex += 1
}
dataListIndex += 1
http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala
index 1678d57..8cc1f26 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala
@@ -41,7 +41,7 @@ import org.apache.flink.util.{Collector, Preconditions}
*/
class RowsClauseBoundedOverProcessFunction(
private val aggregates: Array[AggregateFunction[_]],
- private val aggFields: Array[Int],
+ private val aggFields: Array[Array[Int]],
private val forwardedFieldCount: Int,
private val aggregationStateType: RowTypeInfo,
private val inputRowType: RowTypeInfo,
@@ -202,7 +202,7 @@ class RowsClauseBoundedOverProcessFunction(
i = 0
while (i < aggregates.length) {
val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
- aggregates(i).retract(accumulator, retractRow.getField(aggFields(i)))
+ aggregates(i).retract(accumulator, retractRow.getField(aggFields(i)(0)))
i += 1
}
}
@@ -212,7 +212,7 @@ class RowsClauseBoundedOverProcessFunction(
while (i < aggregates.length) {
val index = forwardedFieldCount + i
val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
- aggregates(i).accumulate(accumulator, input.getField(aggFields(i)))
+ aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
output.setField(index, aggregates(i).getValue(accumulator))
i += 1
}
http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
index 92faf7d..3f7f35f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
@@ -43,7 +43,7 @@ import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
*/
abstract class UnboundedEventTimeOverProcessFunction(
private val aggregates: Array[AggregateFunction[_]],
- private val aggFields: Array[Int],
+ private val aggFields: Array[Array[Int]],
private val forwardedFieldCount: Int,
private val intermediateType: TypeInformation[Row],
private val inputType: TypeInformation[Row])
@@ -217,7 +217,7 @@ abstract class UnboundedEventTimeOverProcessFunction(
*/
class UnboundedEventTimeRowsOverProcessFunction(
aggregates: Array[AggregateFunction[_]],
- aggFields: Array[Int],
+ aggFields: Array[Array[Int]],
forwardedFieldCount: Int,
intermediateType: TypeInformation[Row],
inputType: TypeInformation[Row])
@@ -250,7 +250,7 @@ class UnboundedEventTimeRowsOverProcessFunction(
while (i < aggregates.length) {
val index = forwardedFieldCount + i
val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
- aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)))
+ aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)(0)))
output.setField(index, aggregates(i).getValue(accumulator))
i += 1
}
@@ -269,7 +269,7 @@ class UnboundedEventTimeRowsOverProcessFunction(
*/
class UnboundedEventTimeRangeOverProcessFunction(
aggregates: Array[AggregateFunction[_]],
- aggFields: Array[Int],
+ aggFields: Array[Array[Int]],
forwardedFieldCount: Int,
intermediateType: TypeInformation[Row],
inputType: TypeInformation[Row])
@@ -294,7 +294,7 @@ class UnboundedEventTimeRangeOverProcessFunction(
while (i < aggregates.length) {
val index = forwardedFieldCount + i
val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator]
- aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)))
+ aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)(0)))
i += 1
}
j += 1
http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
index 7750511..713d91b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
@@ -37,7 +37,7 @@ import org.apache.flink.util.{Collector, Preconditions}
*/
class UnboundedNonPartitionedProcessingOverProcessFunction(
private val aggregates: Array[AggregateFunction[_]],
- private val aggFields: Array[Int],
+ private val aggFields: Array[Array[Int]],
private val forwardedFieldCount: Int,
private val aggregationStateType: RowTypeInfo)
extends ProcessFunction[Row, Row] with CheckpointedFunction{
@@ -82,7 +82,7 @@ class UnboundedNonPartitionedProcessingOverProcessFunction(
while (i < aggregates.length) {
val index = forwardedFieldCount + i
val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
- aggregates(i).accumulate(accumulator, input.getField(aggFields(i)))
+ aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
output.setField(index, aggregates(i).getValue(accumulator))
i += 1
}
http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
index 41f8e8c..0276290 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala
@@ -28,7 +28,7 @@ import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
class UnboundedProcessingOverProcessFunction(
private val aggregates: Array[AggregateFunction[_]],
- private val aggFields: Array[Int],
+ private val aggFields: Array[Array[Int]],
private val forwardedFieldCount: Int,
private val aggregationStateType: RowTypeInfo)
extends ProcessFunction[Row, Row]{
@@ -75,7 +75,7 @@ class UnboundedProcessingOverProcessFunction(
while (i < aggregates.length) {
val index = forwardedFieldCount + i
val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
- aggregates(i).accumulate(accumulator, input.getField(aggFields(i)))
+ aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
output.setField(index, aggregates(i).getValue(accumulator))
i += 1
}