You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/23 09:59:18 UTC

[GitHub] twalthr commented on a change in pull request #7059: [FLINK-10689][table] Port UDFs in Table API extension points to flink-table-common

twalthr commented on a change in pull request #7059: [FLINK-10689][table] Port UDFs in Table API extension points to flink-table-common
URL: https://github.com/apache/flink/pull/7059#discussion_r235888819
 
 

 ##########
 File path: flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
 ##########
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/**
+ * Base class for User-Defined Aggregates.
+ *
+ * <p>The behavior of an {@link AggregateFunction} can be defined by implementing a series of custom
+ * methods. An {@link AggregateFunction} needs at least three methods:
+ *  - createAccumulator,
+ *  - accumulate, and
+ *  - getValue.
+ *
+ * <p>There are a few other methods that can be optional to have:
+ *  - retract,
+ *  - merge, and
+ *  - resetAccumulator
+ *
+ * <p>All these methods must be declared publicly, not static and named exactly as the names
+ * mentioned above. The methods createAccumulator and getValue are defined in the
+ * {@link AggregateFunction} functions, while other methods are explained below.
+ *
+ * <p>Processes the input values and update the provided accumulator instance. The method
+ * accumulate can be overloaded with different custom types and arguments. An AggregateFunction
+ * requires at least one accumulate() method.
+ *
+ * {@code
+ * param accumulator           the accumulator which contains the current aggregated results
+ * param userDefinedInputs the input value (usually obtained from a new arrived data).
+ *
+ * def accumulate(accumulator: ACC, userDefinedInputs): Unit
+ * }
+ *
+ * <p>Retracts the input values from the accumulator instance. The current design assumes the
+ * inputs are the values that have been previously accumulated. The method retract can be
+ * overloaded with different custom types and arguments. This function must be implemented for
+ * datastream bounded over aggregate.
+ *
+ * {@code
+ * param accumulator           the accumulator which contains the current aggregated results
+ * param userDefinedInputs the input value (usually obtained from a new arrived data).
+ *
+ * def retract(accumulator: ACC, userDefinedInputs): Unit
+ * }
+ *
+ * <p>Merges a group of accumulator instances into one accumulator instance. This function must be
+ * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
+ *
+ * {@code
+ * param accumulator  the accumulator which will keep the merged aggregate results. It should
+ *                     be noted that the accumulator may contain the previous aggregated
+ *                     results. Therefore user should not replace or clean this instance in the
+ *                     custom merge method.
+ * param its          an {@link java.lang.Iterable} pointed to a group of accumulators that will be
+ *                     merged.
+ *
+ * def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit
+ * }
+ *
+ * <p>Resets the accumulator for this {@link AggregateFunction}. This function must be implemented for
+ * dataset grouping aggregate.
+ *
+ * {@code
+ * param accumulator  the accumulator which needs to be reset
+ *
+ * def resetAccumulator(accumulator: ACC): Unit
+ * }
+ *
+ *
+ * @param T   the type of the aggregation result
+ * @param ACC the type of the aggregation accumulator. The accumulator is used to keep the
+ *             aggregated values which are needed to compute an aggregation result.
+ *             AggregateFunction represents its state using accumulator, thereby the state of the
+ *             AggregateFunction must be put into the accumulator.
+ */
+public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
 
 Review comment:
   Add proper annotations to all classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services