You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/13 16:04:10 UTC

[GitHub] [flink] twalthr opened a new pull request #13143: [FLINK-18936][docs] Update documentation around aggregate functions

twalthr opened a new pull request #13143:
URL: https://github.com/apache/flink/pull/13143


   ## What is the purpose of the change
   
   Ensures that all documentation about aggregate functions is correct and consistent.
   
   ## Brief change log
   
   - Removes mentioning of `resetAccumulator`
   - Updates JavaDocs and website docs for `AggregateFunction` and `TableAggregateFunction`
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? docs
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr closed pull request #13143: [FLINK-18936][docs] Update documentation around aggregate functions

Posted by GitBox <gi...@apache.org>.
twalthr closed pull request #13143:
URL: https://github.com/apache/flink/pull/13143


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13143: [FLINK-18936][docs] Update documentation around aggregate functions

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13143:
URL: https://github.com/apache/flink/pull/13143#issuecomment-673566801


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit e3f355ba1e729543a1b902c6f3fba3c6836f5023 (Thu Aug 13 16:07:00 UTC 2020)
   
   **Warnings:**
    * Documentation files were touched, but no `.zh.md` files: Update Chinese documentation or file Jira ticket.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13143: [FLINK-18936][docs] Update documentation around aggregate functions

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13143:
URL: https://github.com/apache/flink/pull/13143#issuecomment-673578366


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e3f355ba1e729543a1b902c6f3fba3c6836f5023",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5512",
       "triggerID" : "e3f355ba1e729543a1b902c6f3fba3c6836f5023",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47d9b86d8a44e691fd0779b66bb702d3424aa886",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5525",
       "triggerID" : "47d9b86d8a44e691fd0779b66bb702d3424aa886",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e3f355ba1e729543a1b902c6f3fba3c6836f5023 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5512) 
   * 47d9b86d8a44e691fd0779b66bb702d3424aa886 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5525) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13143: [FLINK-18936][docs] Update documentation around aggregate functions

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13143:
URL: https://github.com/apache/flink/pull/13143#issuecomment-673578366


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e3f355ba1e729543a1b902c6f3fba3c6836f5023",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5512",
       "triggerID" : "e3f355ba1e729543a1b902c6f3fba3c6836f5023",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47d9b86d8a44e691fd0779b66bb702d3424aa886",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5525",
       "triggerID" : "47d9b86d8a44e691fd0779b66bb702d3424aa886",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c1a06719b7ee5be9dbe0c1c1a812991bca0349c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9c1a06719b7ee5be9dbe0c1c1a812991bca0349c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e3f355ba1e729543a1b902c6f3fba3c6836f5023 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5512) 
   * 47d9b86d8a44e691fd0779b66bb702d3424aa886 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5525) 
   * 9c1a06719b7ee5be9dbe0c1c1a812991bca0349c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] sjwiesman commented on a change in pull request #13143: [FLINK-18936][docs] Update documentation around aggregate functions

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #13143:
URL: https://github.com/apache/flink/pull/13143#discussion_r470136543



##########
File path: docs/dev/table/functions/udfs.md
##########
@@ -1268,693 +1044,770 @@ class WeightedAvg extends AggregateFunction[JLong, CountAccumulator] {
     }
   }
 
-  def resetAccumulator(acc: WeightedAvgAccum): Unit = {
+  def resetAccumulator(acc: WeightedAvgAccumulator): Unit = {
     acc.count = 0
     acc.sum = 0L
   }
+}
 
-  override def getAccumulatorType: TypeInformation[WeightedAvgAccum] = {
-    new TupleTypeInfo(classOf[WeightedAvgAccum], Types.LONG, Types.INT)
-  }
+val env = TableEnvironment.create(...)
 
-  override def getResultType: TypeInformation[JLong] = Types.LONG
-}
+// call function "inline" without registration in Table API
+env
+  .from("MyTable")
+  .groupBy($"myField")
+  .select($"myField", call(classOf[WeightedAvg], $"value", $"weight"))
 
 // register function
-val tEnv: StreamTableEnvironment = ???
-tEnv.registerFunction("wAvg", new WeightedAvg())
+env.createTemporarySystemFunction("WeightedAvg", classOf[WeightedAvg])
 
-// use function
-tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
+// call registered function in Table API
+env
+  .from("MyTable")
+  .groupBy($"myField")
+  .select($"myField", call("WeightedAvg", $"value", $"weight"))
 
+// call registered function in SQL
+env.sqlQuery(
+  "SELECT myField, WeightedAvg(value, weight) FROM MyTable GROUP BY myField"
+)
 {% endhighlight %}
 </div>
 
-<div data-lang="python" markdown="1">
-{% highlight python %}
-'''
-Java code:
-
-/**
- * Accumulator for WeightedAvg.
- */
-public static class WeightedAvgAccum {
-    public long sum = 0;
-    public int count = 0;
-}
-
-// The java class must have a public no-argument constructor and can be founded in current java classloader.
-
-/**
- * Weighted Average user-defined aggregate function.
- */
-public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {
-
-    @Override
-    public WeightedAvgAccum createAccumulator() {
-        return new WeightedAvgAccum();
-    }
-
-    @Override
-    public Long getValue(WeightedAvgAccum acc) {
-        if (acc.count == 0) {
-            return null;
-        } else {
-            return acc.sum / acc.count;
-        }
-    }
-
-    public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
-        acc.sum += iValue * iWeight;
-        acc.count += iWeight;
-    }
-
-    public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
-        acc.sum -= iValue * iWeight;
-        acc.count -= iWeight;
-    }
-    
-    public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
-        Iterator<WeightedAvgAccum> iter = it.iterator();
-        while (iter.hasNext()) {
-            WeightedAvgAccum a = iter.next();
-            acc.count += a.count;
-            acc.sum += a.sum;
-        }
-    }
-    
-    public void resetAccumulator(WeightedAvgAccum acc) {
-        acc.count = 0;
-        acc.sum = 0L;
-    }
-}
-'''
-
-# register function
-t_env = ...  # type: StreamTableEnvironment
-t_env.register_java_function("wAvg", "my.java.function.WeightedAvg")
+</div>
 
-# use function
-t_env.sql_query("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs. The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted average value, the accumulator
+needs to store the weighted sum and count of all the data that has been accumulated. In our example, we
+define a class `WeightedAvgAccumulator` to be the accumulator. Accumulators are automatically managed
+by Flink's checkpointing mechanism and are restored in case of a failure to ensure exactly-once semantics.
 
-{% endhighlight %}
-</div>
-</div>
+### Mandatory and Optional Methods
 
-{% top %}
+**The following methods are mandatory for each `AggregateFunction`:**
 
-Table Aggregation Functions
----------------------
+- `createAccumulator()`
+- `accumulate(...)` 
+- `getValue(...)`
+ 
+Additionally, there are a few methods that can be optionally implemented. While some of these methods
+allow the system more efficient query execution, others are mandatory for certain use cases. For instance,
+the `merge(...)` method is mandatory if the aggregation function should be applied in the context of a
+session group window (the accumulators of two session windows need to be joined when a row is observed
+that "connects" them).
 
-User-Defined Table Aggregate Functions (UDTAGGs) aggregate a table (one or more rows with one or more attributes) to a result table with multi rows and columns. 
+**The following methods of `AggregateFunction` are required depending on the use case:**
 
-<center>
-<img alt="UDAGG mechanism" src="{{ site.baseurl }}/fig/udtagg-mechanism.png" width="80%">
-</center>
+- `retract(...)` is required for aggregations on `OVER` windows.
+- `merge(...)` is required for many bounded aggregations and session window aggregations.
 
-The above figure shows an example of a table aggregation. Assume you have a table that contains data about beverages. The table consists of three columns, `id`, `name` and `price` and 5 rows. Imagine you need to find the top 2 highest prices of all beverages in the table, i.e., perform a `top2()` table aggregation. You would need to check each of the 5 rows and the result would be a table with the top 2 values.
+If the aggregate function can only be applied in an OVER window, this can be declared by returning the
+requirement `FunctionRequirement.OVER_WINDOW_ONLY` in `getRequirements()`.
 
-User-defined table aggregation functions are implemented by extending the `TableAggregateFunction` class. A `TableAggregateFunction` works as follows. First, it needs an `accumulator`, which is the data structure that holds the intermediate result of the aggregation. An empty accumulator is created by calling the `createAccumulator()` method of the `TableAggregateFunction`. Subsequently, the `accumulate()` method of the function is called for each input row to update the accumulator. Once all rows have been processed, the `emitValue()` method of the function is called to compute and return the final results. 
+If an accumulator needs to store large amounts of data, `org.apache.flink.table.api.dataview.ListView`
+and `org.apache.flink.table.api.dataview.MapView` provide advanced features for leveraging Flink's state
+backends in unbounded data scenarios. Please see the docs of the corresponding classes for more information
+about this advanced feature.
 
-**The following methods are mandatory for each `TableAggregateFunction`:**
+Since some of methods are optional or can be overloaded, the methods are called by generated code. The
+base class does not always provide a signature to be overridden by the concrete implementation class. Nevertheless,
+all mentioned methods must be declared publicly, not static, and named exactly as the names mentioned above
+to be called.

Review comment:
       ```suggestion
   Since some of the methods are optional, or can be overloaded, the runtime invokes aggregate function methods via generated code. This means the base class does not always provide a signature to be overridden by the concrete implementation. Nevertheless,
   all mentioned methods must be declared publicly, not static, and named exactly as the names mentioned above
   to be called.
   ```

##########
File path: docs/dev/table/functions/udfs.md
##########
@@ -729,7 +725,7 @@ If you intend to implement or call functions in Python, please refer to the [Pyt
 Table Functions
 ---------------
 
-Similar to a user-defined scalar function, a user-defined table function takes zero, one, or multiple scalar values as input arguments. However, in contrast to a scalar function, it can return an arbitrary number of rows (or structured types) as output instead of a single value. The returned record may consist of one or more fields. If an output record consists of only one field, the structured record can be omitted and a scalar value can be emitted. It will be wrapped into an implicit row by the runtime.
+Similar to a user-defined scalar function, a user-defined table function (_UDTF_) takes zero, one, or multiple scalar values as input arguments. However, in contrast to a scalar function, it can return an arbitrary number of rows (or structured types) as output instead of a single value. The returned record may consist of one or more fields. If an output record consists of only one field, the structured record can be omitted and a scalar value can be emitted. It will be wrapped into an implicit row by the runtime.

Review comment:
       ```suggestion
   Similar to a user-defined scalar function, a user-defined table function (_UDTF_) takes zero, one, or multiple scalar values as input arguments. However, it can return an arbitrary number of rows (or structured types) as output instead of a single value. The returned record may consist of one or more fields. If an output record consists of only a single field, the structured record can be omitted, and a scalar value can be emitted. The runtime will implicitly wrap it into an implicit row.
   ```

##########
File path: docs/dev/table/functions/udfs.md
##########
@@ -1268,693 +1044,770 @@ class WeightedAvg extends AggregateFunction[JLong, CountAccumulator] {
     }
   }
 
-  def resetAccumulator(acc: WeightedAvgAccum): Unit = {
+  def resetAccumulator(acc: WeightedAvgAccumulator): Unit = {
     acc.count = 0
     acc.sum = 0L
   }
+}
 
-  override def getAccumulatorType: TypeInformation[WeightedAvgAccum] = {
-    new TupleTypeInfo(classOf[WeightedAvgAccum], Types.LONG, Types.INT)
-  }
+val env = TableEnvironment.create(...)
 
-  override def getResultType: TypeInformation[JLong] = Types.LONG
-}
+// call function "inline" without registration in Table API
+env
+  .from("MyTable")
+  .groupBy($"myField")
+  .select($"myField", call(classOf[WeightedAvg], $"value", $"weight"))
 
 // register function
-val tEnv: StreamTableEnvironment = ???
-tEnv.registerFunction("wAvg", new WeightedAvg())
+env.createTemporarySystemFunction("WeightedAvg", classOf[WeightedAvg])
 
-// use function
-tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
+// call registered function in Table API
+env
+  .from("MyTable")
+  .groupBy($"myField")
+  .select($"myField", call("WeightedAvg", $"value", $"weight"))
 
+// call registered function in SQL
+env.sqlQuery(
+  "SELECT myField, WeightedAvg(value, weight) FROM MyTable GROUP BY myField"
+)
 {% endhighlight %}
 </div>
 
-<div data-lang="python" markdown="1">
-{% highlight python %}
-'''
-Java code:
-
-/**
- * Accumulator for WeightedAvg.
- */
-public static class WeightedAvgAccum {
-    public long sum = 0;
-    public int count = 0;
-}
-
-// The java class must have a public no-argument constructor and can be founded in current java classloader.
-
-/**
- * Weighted Average user-defined aggregate function.
- */
-public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {
-
-    @Override
-    public WeightedAvgAccum createAccumulator() {
-        return new WeightedAvgAccum();
-    }
-
-    @Override
-    public Long getValue(WeightedAvgAccum acc) {
-        if (acc.count == 0) {
-            return null;
-        } else {
-            return acc.sum / acc.count;
-        }
-    }
-
-    public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
-        acc.sum += iValue * iWeight;
-        acc.count += iWeight;
-    }
-
-    public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
-        acc.sum -= iValue * iWeight;
-        acc.count -= iWeight;
-    }
-    
-    public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
-        Iterator<WeightedAvgAccum> iter = it.iterator();
-        while (iter.hasNext()) {
-            WeightedAvgAccum a = iter.next();
-            acc.count += a.count;
-            acc.sum += a.sum;
-        }
-    }
-    
-    public void resetAccumulator(WeightedAvgAccum acc) {
-        acc.count = 0;
-        acc.sum = 0L;
-    }
-}
-'''
-
-# register function
-t_env = ...  # type: StreamTableEnvironment
-t_env.register_java_function("wAvg", "my.java.function.WeightedAvg")
+</div>
 
-# use function
-t_env.sql_query("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs. The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted average value, the accumulator
+needs to store the weighted sum and count of all the data that has been accumulated. In our example, we
+define a class `WeightedAvgAccumulator` to be the accumulator. Accumulators are automatically managed
+by Flink's checkpointing mechanism and are restored in case of a failure to ensure exactly-once semantics.
 
-{% endhighlight %}
-</div>
-</div>
+### Mandatory and Optional Methods
 
-{% top %}
+**The following methods are mandatory for each `AggregateFunction`:**
 
-Table Aggregation Functions
----------------------
+- `createAccumulator()`
+- `accumulate(...)` 
+- `getValue(...)`
+ 
+Additionally, there are a few methods that can be optionally implemented. While some of these methods
+allow the system more efficient query execution, others are mandatory for certain use cases. For instance,
+the `merge(...)` method is mandatory if the aggregation function should be applied in the context of a
+session group window (the accumulators of two session windows need to be joined when a row is observed
+that "connects" them).
 
-User-Defined Table Aggregate Functions (UDTAGGs) aggregate a table (one or more rows with one or more attributes) to a result table with multi rows and columns. 
+**The following methods of `AggregateFunction` are required depending on the use case:**
 
-<center>
-<img alt="UDAGG mechanism" src="{{ site.baseurl }}/fig/udtagg-mechanism.png" width="80%">
-</center>
+- `retract(...)` is required for aggregations on `OVER` windows.
+- `merge(...)` is required for many bounded aggregations and session window aggregations.
 
-The above figure shows an example of a table aggregation. Assume you have a table that contains data about beverages. The table consists of three columns, `id`, `name` and `price` and 5 rows. Imagine you need to find the top 2 highest prices of all beverages in the table, i.e., perform a `top2()` table aggregation. You would need to check each of the 5 rows and the result would be a table with the top 2 values.
+If the aggregate function can only be applied in an OVER window, this can be declared by returning the
+requirement `FunctionRequirement.OVER_WINDOW_ONLY` in `getRequirements()`.
 
-User-defined table aggregation functions are implemented by extending the `TableAggregateFunction` class. A `TableAggregateFunction` works as follows. First, it needs an `accumulator`, which is the data structure that holds the intermediate result of the aggregation. An empty accumulator is created by calling the `createAccumulator()` method of the `TableAggregateFunction`. Subsequently, the `accumulate()` method of the function is called for each input row to update the accumulator. Once all rows have been processed, the `emitValue()` method of the function is called to compute and return the final results. 
+If an accumulator needs to store large amounts of data, `org.apache.flink.table.api.dataview.ListView`
+and `org.apache.flink.table.api.dataview.MapView` provide advanced features for leveraging Flink's state
+backends in unbounded data scenarios. Please see the docs of the corresponding classes for more information
+about this advanced feature.
 
-**The following methods are mandatory for each `TableAggregateFunction`:**
+Since some of methods are optional or can be overloaded, the methods are called by generated code. The
+base class does not always provide a signature to be overridden by the concrete implementation class. Nevertheless,
+all mentioned methods must be declared publicly, not static, and named exactly as the names mentioned above
+to be called.
 
-- `createAccumulator()`
-- `accumulate()` 
+Detailed documentation for all methods that are not declared in `AggregateFunction` and called by generated
+code is given below.
 
-Flink’s type extraction facilities can fail to identify complex data types, e.g., if they are not basic types or simple POJOs. So similar to `ScalarFunction` and `TableFunction`, `TableAggregateFunction` provides methods to specify the `TypeInformation` of the result type (through 
- `TableAggregateFunction#getResultType()`) and the type of the accumulator (through `TableAggregateFunction#getAccumulatorType()`).
- 
-Besides the above methods, there are a few contracted methods that can be 
-optionally implemented. While some of these methods allow the system more efficient query execution, others are mandatory for certain use cases. For instance, the `merge()` method is mandatory if the aggregation function should be applied in the context of a session group window (the accumulators of two session windows need to be joined when a row is observed that "connects" them). 
+**`accumulate(...)`**
+<div class="codetabs" markdown="1">
 
-**The following methods of `TableAggregateFunction` are required depending on the use case:**
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/*
+ * Processes the input values and updates the provided accumulator instance. The method
+ * accumulate can be overloaded with different custom types and arguments. An aggregate function
+ * requires at least one accumulate() method.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new arrived data).
+ */
+public void accumulate(ACC accumulator, [user defined inputs])
+{% endhighlight %}
+</div>
 
-- `retract()` is required for aggregations on bounded `OVER` windows.
-- `merge()` is required for many batch aggregations and session window aggregations.
-- `resetAccumulator()` is required for many batch aggregations.
-- `emitValue()` is required for batch and window aggregations.
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/*
+ * Processes the input values and updates the provided accumulator instance. The method
+ * accumulate can be overloaded with different custom types and arguments. An aggregate function
+ * requires at least one accumulate() method.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new arrived data).
+ */
+def accumulate(accumulator: ACC, [user defined inputs]): Unit
+{% endhighlight %}
+</div>
 
-**The following methods of `TableAggregateFunction` are used to improve the performance of streaming jobs:**
+</div>
 
-- `emitUpdateWithRetract()` is used to emit values that have been updated under retract mode.
+**`retract(...)`**
+<div class="codetabs" markdown="1">
 
-For `emitValue` method, it emits full data according to the accumulator. Take TopN as an example, `emitValue` emit all top n values each time. This may bring performance problems for streaming jobs. To improve the performance, a user can also implement `emitUpdateWithRetract` method to improve the performance. The method outputs data incrementally in retract mode, i.e., once there is an update, we have to retract old records before sending new updated ones. The method will be used in preference to the `emitValue` method if they are all defined in the table aggregate function, because `emitUpdateWithRetract` is treated to be more efficient than `emitValue` as it can output values incrementally.
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/*
+ * Retracts the input values from the accumulator instance. The current design assumes the
+ * inputs are the values that have been previously accumulated. The method retract can be
+ * overloaded with different custom types and arguments. This method must be implemented for
+ * bounded OVER aggregates over unbounded tables.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new arrived data).
+ */
+public void retract(ACC accumulator, [user defined inputs])
+{% endhighlight %}
+</div>
 
-All methods of `TableAggregateFunction` must be declared as `public`, not `static` and named exactly as the names mentioned above. The methods `createAccumulator`, `getResultType`, and `getAccumulatorType` are defined in the parent abstract class of `TableAggregateFunction`, while others are contracted methods. In order to define a table aggregate function, one has to extend the base class `org.apache.flink.table.functions.TableAggregateFunction` and implement one (or more) `accumulate` methods. The method `accumulate` can be overloaded with different parameter types and supports variable arguments.
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/*
+ * Retracts the input values from the accumulator instance. The current design assumes the
+ * inputs are the values that have been previously accumulated. The method retract can be
+ * overloaded with different custom types and arguments. This method must be implemented for
+ * bounded OVER aggregates over unbounded tables.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new arrived data).
+ */
+def retract(accumulator: ACC, [user defined inputs]): Unit
+{% endhighlight %}
+</div>
 
-Detailed documentation for all methods of `TableAggregateFunction` is given below. 
+</div>
 
+**`merge(...)`**
 <div class="codetabs" markdown="1">
+
 <div data-lang="java" markdown="1">
 {% highlight java %}
-
-/**
-  * Base class for user-defined aggregates and table aggregates.
-  *
-  * @param <T>   the type of the aggregation result.
-  * @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
-  *             aggregated values which are needed to compute an aggregation result.
-  */
-public abstract class UserDefinedAggregateFunction<T, ACC> extends UserDefinedFunction {
-
-  /**
-    * Creates and init the Accumulator for this (table)aggregate function.
-    *
-    * @return the accumulator with the initial value
-    */
-  public ACC createAccumulator(); // MANDATORY
-
-  /**
-    * Returns the TypeInformation of the (table)aggregate function's result.
-    *
-    * @return The TypeInformation of the (table)aggregate function's result or null if the result
-    *         type should be automatically inferred.
-    */
-  public TypeInformation<T> getResultType = null; // PRE-DEFINED
-
-  /**
-    * Returns the TypeInformation of the (table)aggregate function's accumulator.
-    *
-    * @return The TypeInformation of the (table)aggregate function's accumulator or null if the
-    *         accumulator type should be automatically inferred.
-    */
-  public TypeInformation<ACC> getAccumulatorType = null; // PRE-DEFINED
-}
-
-/**
-  * Base class for table aggregation functions. 
-  *
-  * @param <T>   the type of the aggregation result
-  * @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
-  *             aggregated values which are needed to compute a table aggregation result.
-  *             TableAggregateFunction represents its state using accumulator, thereby the state of
-  *             the TableAggregateFunction must be put into the accumulator.
-  */
-public abstract class TableAggregateFunction<T, ACC> extends UserDefinedAggregateFunction<T, ACC> {
-
-  /** Processes the input values and update the provided accumulator instance. The method
-    * accumulate can be overloaded with different custom types and arguments. A TableAggregateFunction
-    * requires at least one accumulate() method.
-    *
-    * @param accumulator           the accumulator which contains the current aggregated results
-    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
-    */
-  public void accumulate(ACC accumulator, [user defined inputs]); // MANDATORY
-
-  /**
-    * Retracts the input values from the accumulator instance. The current design assumes the
-    * inputs are the values that have been previously accumulated. The method retract can be
-    * overloaded with different custom types and arguments. This function must be implemented for
-    * datastream bounded over aggregate.
-    *
-    * @param accumulator           the accumulator which contains the current aggregated results
-    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
-    */
-  public void retract(ACC accumulator, [user defined inputs]); // OPTIONAL
-
-  /**
-    * Merges a group of accumulator instances into one accumulator instance. This function must be
-    * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
-    *
-    * @param accumulator  the accumulator which will keep the merged aggregate results. It should
-    *                     be noted that the accumulator may contain the previous aggregated
-    *                     results. Therefore user should not replace or clean this instance in the
-    *                     custom merge method.
-    * @param its          an {@link java.lang.Iterable} pointed to a group of accumulators that will be
-    *                     merged.
-    */
-  public void merge(ACC accumulator, java.lang.Iterable<ACC> its); // OPTIONAL
-
-  /**
-    * Called every time when an aggregation result should be materialized. The returned value
-    * could be either an early and incomplete result  (periodically emitted as data arrive) or
-    * the final result of the  aggregation.
-    *
-    * @param accumulator the accumulator which contains the current
-    *                    aggregated results
-    * @param out         the collector used to output data
-    */
-  public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL
-  
-  /**
-    * Called every time when an aggregation result should be materialized. The returned value
-    * could be either an early and incomplete result (periodically emitted as data arrive) or
-    * the final result of the aggregation.
-    *
-    * Different from emitValue, emitUpdateWithRetract is used to emit values that have been updated.
-    * This method outputs data incrementally in retract mode, i.e., once there is an update, we
-    * have to retract old records before sending new updated ones. The emitUpdateWithRetract
-    * method will be used in preference to the emitValue method if both methods are defined in the
-    * table aggregate function, because the method is treated to be more efficient than emitValue
-    * as it can outputvalues incrementally.
-    *
-    * @param accumulator the accumulator which contains the current
-    *                    aggregated results
-    * @param out         the retractable collector used to output data. Use collect method
-    *                    to output(add) records and use retract method to retract(delete)
-    *                    records.
-    */
-  public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out); // OPTIONAL
-  
-  /**
-    * Collects a record and forwards it. The collector can output retract messages with the retract
-    * method. Note: only use it in {@code emitRetractValueIncrementally}.
-    */
-  public interface RetractableCollector<T> extends Collector<T> {
-
-      /**
-        * Retract a record.
-        *
-        * @param record The record to retract.
-        */
-      void retract(T record);
-  }
-}
+/*
+ * Merges a group of accumulator instances into one accumulator instance. This method must be
+ * implemented for unbounded session window grouping aggregates and bounded grouping aggregates.
+ *
+ * param: accumulator the accumulator which will keep the merged aggregate results. It should
+ *                    be noted that the accumulator may contain the previous aggregated
+ *                    results. Therefore user should not replace or clean this instance in the
+ *                    custom merge method.
+ * param: iterable    an java.lang.Iterable pointed to a group of accumulators that will be
+ *                    merged.
+ */
+public void merge(ACC accumulator, java.lang.Iterable<ACC> iterable)
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-/**
-  * Base class for user-defined aggregates and table aggregates.
-  *
-  * @tparam T   the type of the aggregation result.
-  * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the
-  *             aggregated values which are needed to compute an aggregation result.
-  */
-abstract class UserDefinedAggregateFunction[T, ACC] extends UserDefinedFunction {
-
-  /**
-    * Creates and init the Accumulator for this (table)aggregate function.
-    *
-    * @return the accumulator with the initial value
-    */
-  def createAccumulator(): ACC // MANDATORY
-
-  /**
-    * Returns the TypeInformation of the (table)aggregate function's result.
-    *
-    * @return The TypeInformation of the (table)aggregate function's result or null if the result
-    *         type should be automatically inferred.
-    */
-  def getResultType: TypeInformation[T] = null // PRE-DEFINED
-
-  /**
-    * Returns the TypeInformation of the (table)aggregate function's accumulator.
-    *
-    * @return The TypeInformation of the (table)aggregate function's accumulator or null if the
-    *         accumulator type should be automatically inferred.
-    */
-  def getAccumulatorType: TypeInformation[ACC] = null // PRE-DEFINED
-}
-
-/**
-  * Base class for table aggregation functions. 
-  *
-  * @tparam T   the type of the aggregation result
-  * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the
-  *             aggregated values which are needed to compute an aggregation result.
-  *             TableAggregateFunction represents its state using accumulator, thereby the state of
-  *             the TableAggregateFunction must be put into the accumulator.
-  */
-abstract class TableAggregateFunction[T, ACC] extends UserDefinedAggregateFunction[T, ACC] {
-
-  /**
-    * Processes the input values and update the provided accumulator instance. The method
-    * accumulate can be overloaded with different custom types and arguments. A TableAggregateFunction
-    * requires at least one accumulate() method.
-    *
-    * @param accumulator           the accumulator which contains the current aggregated results
-    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
-    */
-  def accumulate(accumulator: ACC, [user defined inputs]): Unit // MANDATORY
-
-  /**
-    * Retracts the input values from the accumulator instance. The current design assumes the
-    * inputs are the values that have been previously accumulated. The method retract can be
-    * overloaded with different custom types and arguments. This function must be implemented for
-    * datastream bounded over aggregate.
-    *
-    * @param accumulator           the accumulator which contains the current aggregated results
-    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
-    */
-  def retract(accumulator: ACC, [user defined inputs]): Unit // OPTIONAL
-
-  /**
-    * Merges a group of accumulator instances into one accumulator instance. This function must be
-    * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
-    *
-    * @param accumulator  the accumulator which will keep the merged aggregate results. It should
-    *                     be noted that the accumulator may contain the previous aggregated
-    *                     results. Therefore user should not replace or clean this instance in the
-    *                     custom merge method.
-    * @param its          an [[java.lang.Iterable]] pointed to a group of accumulators that will be
-    *                     merged.
-    */
-  def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit // OPTIONAL
-  
-  /**
-    * Called every time when an aggregation result should be materialized. The returned value
-    * could be either an early and incomplete result  (periodically emitted as data arrive) or
-    * the final result of the  aggregation.
-    *
-    * @param accumulator the accumulator which contains the current
-    *                    aggregated results
-    * @param out         the collector used to output data
-    */
-  def emitValue(accumulator: ACC, out: Collector[T]): Unit // OPTIONAL
-
-  /**
-    * Called every time when an aggregation result should be materialized. The returned value
-    * could be either an early and incomplete result (periodically emitted as data arrive) or
-    * the final result of the aggregation.
-    *
-    * Different from emitValue, emitUpdateWithRetract is used to emit values that have been updated.
-    * This method outputs data incrementally in retract mode, i.e., once there is an update, we
-    * have to retract old records before sending new updated ones. The emitUpdateWithRetract
-    * method will be used in preference to the emitValue method if both methods are defined in the
-    * table aggregate function, because the method is treated to be more efficient than emitValue
-    * as it can outputvalues incrementally.
-    *
-    * @param accumulator the accumulator which contains the current
-    *                    aggregated results
-    * @param out         the retractable collector used to output data. Use collect method
-    *                    to output(add) records and use retract method to retract(delete)
-    *                    records.
-    */
-  def emitUpdateWithRetract(accumulator: ACC, out: RetractableCollector[T]): Unit // OPTIONAL
- 
-  /**
-    * Collects a record and forwards it. The collector can output retract messages with the retract
-    * method. Note: only use it in `emitRetractValueIncrementally`.
-    */
-  trait RetractableCollector[T] extends Collector[T] {
-    
-    /**
-      * Retract a record.
-      *
-      * @param record The record to retract.
-      */
-    def retract(record: T): Unit
-  }
-}
+/*
+ * Merges a group of accumulator instances into one accumulator instance. This method must be
+ * implemented for unbounded session window grouping aggregates and bounded grouping aggregates.
+ *
+ * param: accumulator the accumulator which will keep the merged aggregate results. It should
+ *                    be noted that the accumulator may contain the previous aggregated
+ *                    results. Therefore user should not replace or clean this instance in the
+ *                    custom merge method.
+ * param: iterable    an java.lang.Iterable pointed to a group of accumulators that will be
+ *                    merged.
+ */
+def merge(accumulator: ACC, iterable: java.lang.Iterable[ACC]): Unit
 {% endhighlight %}
 </div>
+
 </div>
 
+{% top %}
+
+Table Aggregate Functions
+-------------------------
+
+A user-defined table aggregate function (_UDTAGG_) maps scalar values of multiple rows to zero, one,
+or multiple rows. The returned record may consist of one or more fields. If an output row consists of
+only one field, the row can be omitted and a scalar value can be emitted. It will be wrapped into an
+implicit row by the runtime.

Review comment:
       ```suggestion
   A user-defined table aggregate function (_UDTAGG_) maps scalar values of multiple rows to zero, one,
   or multiple rows. The returned records may consist of one or more fields. If an output row consists of
   only one field, the row can be omitted and a scalar value can be emitted and it will be implicitly wrapped into an row by the runtime.
   ```

##########
File path: docs/dev/table/functions/udfs.md
##########
@@ -1268,693 +1044,770 @@ class WeightedAvg extends AggregateFunction[JLong, CountAccumulator] {
     }
   }
 
-  def resetAccumulator(acc: WeightedAvgAccum): Unit = {
+  def resetAccumulator(acc: WeightedAvgAccumulator): Unit = {
     acc.count = 0
     acc.sum = 0L
   }
+}
 
-  override def getAccumulatorType: TypeInformation[WeightedAvgAccum] = {
-    new TupleTypeInfo(classOf[WeightedAvgAccum], Types.LONG, Types.INT)
-  }
+val env = TableEnvironment.create(...)
 
-  override def getResultType: TypeInformation[JLong] = Types.LONG
-}
+// call function "inline" without registration in Table API
+env
+  .from("MyTable")
+  .groupBy($"myField")
+  .select($"myField", call(classOf[WeightedAvg], $"value", $"weight"))
 
 // register function
-val tEnv: StreamTableEnvironment = ???
-tEnv.registerFunction("wAvg", new WeightedAvg())
+env.createTemporarySystemFunction("WeightedAvg", classOf[WeightedAvg])
 
-// use function
-tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
+// call registered function in Table API
+env
+  .from("MyTable")
+  .groupBy($"myField")
+  .select($"myField", call("WeightedAvg", $"value", $"weight"))
 
+// call registered function in SQL
+env.sqlQuery(
+  "SELECT myField, WeightedAvg(value, weight) FROM MyTable GROUP BY myField"
+)
 {% endhighlight %}
 </div>
 
-<div data-lang="python" markdown="1">
-{% highlight python %}
-'''
-Java code:
-
-/**
- * Accumulator for WeightedAvg.
- */
-public static class WeightedAvgAccum {
-    public long sum = 0;
-    public int count = 0;
-}
-
-// The java class must have a public no-argument constructor and can be founded in current java classloader.
-
-/**
- * Weighted Average user-defined aggregate function.
- */
-public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {
-
-    @Override
-    public WeightedAvgAccum createAccumulator() {
-        return new WeightedAvgAccum();
-    }
-
-    @Override
-    public Long getValue(WeightedAvgAccum acc) {
-        if (acc.count == 0) {
-            return null;
-        } else {
-            return acc.sum / acc.count;
-        }
-    }
-
-    public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
-        acc.sum += iValue * iWeight;
-        acc.count += iWeight;
-    }
-
-    public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
-        acc.sum -= iValue * iWeight;
-        acc.count -= iWeight;
-    }
-    
-    public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
-        Iterator<WeightedAvgAccum> iter = it.iterator();
-        while (iter.hasNext()) {
-            WeightedAvgAccum a = iter.next();
-            acc.count += a.count;
-            acc.sum += a.sum;
-        }
-    }
-    
-    public void resetAccumulator(WeightedAvgAccum acc) {
-        acc.count = 0;
-        acc.sum = 0L;
-    }
-}
-'''
-
-# register function
-t_env = ...  # type: StreamTableEnvironment
-t_env.register_java_function("wAvg", "my.java.function.WeightedAvg")
+</div>
 
-# use function
-t_env.sql_query("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs. The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted average value, the accumulator
+needs to store the weighted sum and count of all the data that has been accumulated. In our example, we
+define a class `WeightedAvgAccumulator` to be the accumulator. Accumulators are automatically managed
+by Flink's checkpointing mechanism and are restored in case of a failure to ensure exactly-once semantics.
 
-{% endhighlight %}
-</div>
-</div>
+### Mandatory and Optional Methods
 
-{% top %}
+**The following methods are mandatory for each `AggregateFunction`:**
 
-Table Aggregation Functions
----------------------
+- `createAccumulator()`
+- `accumulate(...)` 
+- `getValue(...)`
+ 
+Additionally, there are a few methods that can be optionally implemented. While some of these methods
+allow the system more efficient query execution, others are mandatory for certain use cases. For instance,
+the `merge(...)` method is mandatory if the aggregation function should be applied in the context of a
+session group window (the accumulators of two session windows need to be joined when a row is observed
+that "connects" them).
 
-User-Defined Table Aggregate Functions (UDTAGGs) aggregate a table (one or more rows with one or more attributes) to a result table with multi rows and columns. 
+**The following methods of `AggregateFunction` are required depending on the use case:**
 
-<center>
-<img alt="UDAGG mechanism" src="{{ site.baseurl }}/fig/udtagg-mechanism.png" width="80%">
-</center>
+- `retract(...)` is required for aggregations on `OVER` windows.
+- `merge(...)` is required for many bounded aggregations and session window aggregations.
 
-The above figure shows an example of a table aggregation. Assume you have a table that contains data about beverages. The table consists of three columns, `id`, `name` and `price` and 5 rows. Imagine you need to find the top 2 highest prices of all beverages in the table, i.e., perform a `top2()` table aggregation. You would need to check each of the 5 rows and the result would be a table with the top 2 values.
+If the aggregate function can only be applied in an OVER window, this can be declared by returning the
+requirement `FunctionRequirement.OVER_WINDOW_ONLY` in `getRequirements()`.
 
-User-defined table aggregation functions are implemented by extending the `TableAggregateFunction` class. A `TableAggregateFunction` works as follows. First, it needs an `accumulator`, which is the data structure that holds the intermediate result of the aggregation. An empty accumulator is created by calling the `createAccumulator()` method of the `TableAggregateFunction`. Subsequently, the `accumulate()` method of the function is called for each input row to update the accumulator. Once all rows have been processed, the `emitValue()` method of the function is called to compute and return the final results. 
+If an accumulator needs to store large amounts of data, `org.apache.flink.table.api.dataview.ListView`
+and `org.apache.flink.table.api.dataview.MapView` provide advanced features for leveraging Flink's state
+backends in unbounded data scenarios. Please see the docs of the corresponding classes for more information
+about this advanced feature.
 
-**The following methods are mandatory for each `TableAggregateFunction`:**
+Since some of methods are optional or can be overloaded, the methods are called by generated code. The
+base class does not always provide a signature to be overridden by the concrete implementation class. Nevertheless,
+all mentioned methods must be declared publicly, not static, and named exactly as the names mentioned above
+to be called.
 
-- `createAccumulator()`
-- `accumulate()` 
+Detailed documentation for all methods that are not declared in `AggregateFunction` and called by generated
+code is given below.
 
-Flink’s type extraction facilities can fail to identify complex data types, e.g., if they are not basic types or simple POJOs. So similar to `ScalarFunction` and `TableFunction`, `TableAggregateFunction` provides methods to specify the `TypeInformation` of the result type (through 
- `TableAggregateFunction#getResultType()`) and the type of the accumulator (through `TableAggregateFunction#getAccumulatorType()`).
- 
-Besides the above methods, there are a few contracted methods that can be 
-optionally implemented. While some of these methods allow the system more efficient query execution, others are mandatory for certain use cases. For instance, the `merge()` method is mandatory if the aggregation function should be applied in the context of a session group window (the accumulators of two session windows need to be joined when a row is observed that "connects" them). 
+**`accumulate(...)`**
+<div class="codetabs" markdown="1">
 
-**The following methods of `TableAggregateFunction` are required depending on the use case:**
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/*
+ * Processes the input values and updates the provided accumulator instance. The method
+ * accumulate can be overloaded with different custom types and arguments. An aggregate function
+ * requires at least one accumulate() method.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new arrived data).
+ */
+public void accumulate(ACC accumulator, [user defined inputs])
+{% endhighlight %}
+</div>
 
-- `retract()` is required for aggregations on bounded `OVER` windows.
-- `merge()` is required for many batch aggregations and session window aggregations.
-- `resetAccumulator()` is required for many batch aggregations.
-- `emitValue()` is required for batch and window aggregations.
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/*
+ * Processes the input values and updates the provided accumulator instance. The method
+ * accumulate can be overloaded with different custom types and arguments. An aggregate function
+ * requires at least one accumulate() method.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new arrived data).
+ */
+def accumulate(accumulator: ACC, [user defined inputs]): Unit
+{% endhighlight %}
+</div>
 
-**The following methods of `TableAggregateFunction` are used to improve the performance of streaming jobs:**
+</div>
 
-- `emitUpdateWithRetract()` is used to emit values that have been updated under retract mode.
+**`retract(...)`**
+<div class="codetabs" markdown="1">
 
-For `emitValue` method, it emits full data according to the accumulator. Take TopN as an example, `emitValue` emit all top n values each time. This may bring performance problems for streaming jobs. To improve the performance, a user can also implement `emitUpdateWithRetract` method to improve the performance. The method outputs data incrementally in retract mode, i.e., once there is an update, we have to retract old records before sending new updated ones. The method will be used in preference to the `emitValue` method if they are all defined in the table aggregate function, because `emitUpdateWithRetract` is treated to be more efficient than `emitValue` as it can output values incrementally.
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/*
+ * Retracts the input values from the accumulator instance. The current design assumes the
+ * inputs are the values that have been previously accumulated. The method retract can be
+ * overloaded with different custom types and arguments. This method must be implemented for
+ * bounded OVER aggregates over unbounded tables.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new arrived data).
+ */
+public void retract(ACC accumulator, [user defined inputs])
+{% endhighlight %}
+</div>
 
-All methods of `TableAggregateFunction` must be declared as `public`, not `static` and named exactly as the names mentioned above. The methods `createAccumulator`, `getResultType`, and `getAccumulatorType` are defined in the parent abstract class of `TableAggregateFunction`, while others are contracted methods. In order to define a table aggregate function, one has to extend the base class `org.apache.flink.table.functions.TableAggregateFunction` and implement one (or more) `accumulate` methods. The method `accumulate` can be overloaded with different parameter types and supports variable arguments.
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/*
+ * Retracts the input values from the accumulator instance. The current design assumes the
+ * inputs are the values that have been previously accumulated. The method retract can be
+ * overloaded with different custom types and arguments. This method must be implemented for
+ * bounded OVER aggregates over unbounded tables.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new arrived data).
+ */
+def retract(accumulator: ACC, [user defined inputs]): Unit
+{% endhighlight %}
+</div>
 
-Detailed documentation for all methods of `TableAggregateFunction` is given below. 
+</div>
 
+**`merge(...)`**
 <div class="codetabs" markdown="1">
+
 <div data-lang="java" markdown="1">
 {% highlight java %}
-
-/**
-  * Base class for user-defined aggregates and table aggregates.
-  *
-  * @param <T>   the type of the aggregation result.
-  * @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
-  *             aggregated values which are needed to compute an aggregation result.
-  */
-public abstract class UserDefinedAggregateFunction<T, ACC> extends UserDefinedFunction {
-
-  /**
-    * Creates and init the Accumulator for this (table)aggregate function.
-    *
-    * @return the accumulator with the initial value
-    */
-  public ACC createAccumulator(); // MANDATORY
-
-  /**
-    * Returns the TypeInformation of the (table)aggregate function's result.
-    *
-    * @return The TypeInformation of the (table)aggregate function's result or null if the result
-    *         type should be automatically inferred.
-    */
-  public TypeInformation<T> getResultType = null; // PRE-DEFINED
-
-  /**
-    * Returns the TypeInformation of the (table)aggregate function's accumulator.
-    *
-    * @return The TypeInformation of the (table)aggregate function's accumulator or null if the
-    *         accumulator type should be automatically inferred.
-    */
-  public TypeInformation<ACC> getAccumulatorType = null; // PRE-DEFINED
-}
-
-/**
-  * Base class for table aggregation functions. 
-  *
-  * @param <T>   the type of the aggregation result
-  * @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
-  *             aggregated values which are needed to compute a table aggregation result.
-  *             TableAggregateFunction represents its state using accumulator, thereby the state of
-  *             the TableAggregateFunction must be put into the accumulator.
-  */
-public abstract class TableAggregateFunction<T, ACC> extends UserDefinedAggregateFunction<T, ACC> {
-
-  /** Processes the input values and update the provided accumulator instance. The method
-    * accumulate can be overloaded with different custom types and arguments. A TableAggregateFunction
-    * requires at least one accumulate() method.
-    *
-    * @param accumulator           the accumulator which contains the current aggregated results
-    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
-    */
-  public void accumulate(ACC accumulator, [user defined inputs]); // MANDATORY
-
-  /**
-    * Retracts the input values from the accumulator instance. The current design assumes the
-    * inputs are the values that have been previously accumulated. The method retract can be
-    * overloaded with different custom types and arguments. This function must be implemented for
-    * datastream bounded over aggregate.
-    *
-    * @param accumulator           the accumulator which contains the current aggregated results
-    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
-    */
-  public void retract(ACC accumulator, [user defined inputs]); // OPTIONAL
-
-  /**
-    * Merges a group of accumulator instances into one accumulator instance. This function must be
-    * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
-    *
-    * @param accumulator  the accumulator which will keep the merged aggregate results. It should
-    *                     be noted that the accumulator may contain the previous aggregated
-    *                     results. Therefore user should not replace or clean this instance in the
-    *                     custom merge method.
-    * @param its          an {@link java.lang.Iterable} pointed to a group of accumulators that will be
-    *                     merged.
-    */
-  public void merge(ACC accumulator, java.lang.Iterable<ACC> its); // OPTIONAL
-
-  /**
-    * Called every time when an aggregation result should be materialized. The returned value
-    * could be either an early and incomplete result  (periodically emitted as data arrive) or
-    * the final result of the  aggregation.
-    *
-    * @param accumulator the accumulator which contains the current
-    *                    aggregated results
-    * @param out         the collector used to output data
-    */
-  public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL
-  
-  /**
-    * Called every time when an aggregation result should be materialized. The returned value
-    * could be either an early and incomplete result (periodically emitted as data arrive) or
-    * the final result of the aggregation.
-    *
-    * Different from emitValue, emitUpdateWithRetract is used to emit values that have been updated.
-    * This method outputs data incrementally in retract mode, i.e., once there is an update, we
-    * have to retract old records before sending new updated ones. The emitUpdateWithRetract
-    * method will be used in preference to the emitValue method if both methods are defined in the
-    * table aggregate function, because the method is treated to be more efficient than emitValue
-    * as it can outputvalues incrementally.
-    *
-    * @param accumulator the accumulator which contains the current
-    *                    aggregated results
-    * @param out         the retractable collector used to output data. Use collect method
-    *                    to output(add) records and use retract method to retract(delete)
-    *                    records.
-    */
-  public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out); // OPTIONAL
-  
-  /**
-    * Collects a record and forwards it. The collector can output retract messages with the retract
-    * method. Note: only use it in {@code emitRetractValueIncrementally}.
-    */
-  public interface RetractableCollector<T> extends Collector<T> {
-
-      /**
-        * Retract a record.
-        *
-        * @param record The record to retract.
-        */
-      void retract(T record);
-  }
-}
+/*
+ * Merges a group of accumulator instances into one accumulator instance. This method must be
+ * implemented for unbounded session window grouping aggregates and bounded grouping aggregates.
+ *
+ * param: accumulator the accumulator which will keep the merged aggregate results. It should
+ *                    be noted that the accumulator may contain the previous aggregated
+ *                    results. Therefore user should not replace or clean this instance in the
+ *                    custom merge method.
+ * param: iterable    an java.lang.Iterable pointed to a group of accumulators that will be
+ *                    merged.
+ */
+public void merge(ACC accumulator, java.lang.Iterable<ACC> iterable)
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-/**
-  * Base class for user-defined aggregates and table aggregates.
-  *
-  * @tparam T   the type of the aggregation result.
-  * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the
-  *             aggregated values which are needed to compute an aggregation result.
-  */
-abstract class UserDefinedAggregateFunction[T, ACC] extends UserDefinedFunction {
-
-  /**
-    * Creates and init the Accumulator for this (table)aggregate function.
-    *
-    * @return the accumulator with the initial value
-    */
-  def createAccumulator(): ACC // MANDATORY
-
-  /**
-    * Returns the TypeInformation of the (table)aggregate function's result.
-    *
-    * @return The TypeInformation of the (table)aggregate function's result or null if the result
-    *         type should be automatically inferred.
-    */
-  def getResultType: TypeInformation[T] = null // PRE-DEFINED
-
-  /**
-    * Returns the TypeInformation of the (table)aggregate function's accumulator.
-    *
-    * @return The TypeInformation of the (table)aggregate function's accumulator or null if the
-    *         accumulator type should be automatically inferred.
-    */
-  def getAccumulatorType: TypeInformation[ACC] = null // PRE-DEFINED
-}
-
-/**
-  * Base class for table aggregation functions. 
-  *
-  * @tparam T   the type of the aggregation result
-  * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the
-  *             aggregated values which are needed to compute an aggregation result.
-  *             TableAggregateFunction represents its state using accumulator, thereby the state of
-  *             the TableAggregateFunction must be put into the accumulator.
-  */
-abstract class TableAggregateFunction[T, ACC] extends UserDefinedAggregateFunction[T, ACC] {
-
-  /**
-    * Processes the input values and update the provided accumulator instance. The method
-    * accumulate can be overloaded with different custom types and arguments. A TableAggregateFunction
-    * requires at least one accumulate() method.
-    *
-    * @param accumulator           the accumulator which contains the current aggregated results
-    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
-    */
-  def accumulate(accumulator: ACC, [user defined inputs]): Unit // MANDATORY
-
-  /**
-    * Retracts the input values from the accumulator instance. The current design assumes the
-    * inputs are the values that have been previously accumulated. The method retract can be
-    * overloaded with different custom types and arguments. This function must be implemented for
-    * datastream bounded over aggregate.
-    *
-    * @param accumulator           the accumulator which contains the current aggregated results
-    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
-    */
-  def retract(accumulator: ACC, [user defined inputs]): Unit // OPTIONAL
-
-  /**
-    * Merges a group of accumulator instances into one accumulator instance. This function must be
-    * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
-    *
-    * @param accumulator  the accumulator which will keep the merged aggregate results. It should
-    *                     be noted that the accumulator may contain the previous aggregated
-    *                     results. Therefore user should not replace or clean this instance in the
-    *                     custom merge method.
-    * @param its          an [[java.lang.Iterable]] pointed to a group of accumulators that will be
-    *                     merged.
-    */
-  def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit // OPTIONAL
-  
-  /**
-    * Called every time when an aggregation result should be materialized. The returned value
-    * could be either an early and incomplete result  (periodically emitted as data arrive) or
-    * the final result of the  aggregation.
-    *
-    * @param accumulator the accumulator which contains the current
-    *                    aggregated results
-    * @param out         the collector used to output data
-    */
-  def emitValue(accumulator: ACC, out: Collector[T]): Unit // OPTIONAL
-
-  /**
-    * Called every time when an aggregation result should be materialized. The returned value
-    * could be either an early and incomplete result (periodically emitted as data arrive) or
-    * the final result of the aggregation.
-    *
-    * Different from emitValue, emitUpdateWithRetract is used to emit values that have been updated.
-    * This method outputs data incrementally in retract mode, i.e., once there is an update, we
-    * have to retract old records before sending new updated ones. The emitUpdateWithRetract
-    * method will be used in preference to the emitValue method if both methods are defined in the
-    * table aggregate function, because the method is treated to be more efficient than emitValue
-    * as it can outputvalues incrementally.
-    *
-    * @param accumulator the accumulator which contains the current
-    *                    aggregated results
-    * @param out         the retractable collector used to output data. Use collect method
-    *                    to output(add) records and use retract method to retract(delete)
-    *                    records.
-    */
-  def emitUpdateWithRetract(accumulator: ACC, out: RetractableCollector[T]): Unit // OPTIONAL
- 
-  /**
-    * Collects a record and forwards it. The collector can output retract messages with the retract
-    * method. Note: only use it in `emitRetractValueIncrementally`.
-    */
-  trait RetractableCollector[T] extends Collector[T] {
-    
-    /**
-      * Retract a record.
-      *
-      * @param record The record to retract.
-      */
-    def retract(record: T): Unit
-  }
-}
+/*
+ * Merges a group of accumulator instances into one accumulator instance. This method must be
+ * implemented for unbounded session window grouping aggregates and bounded grouping aggregates.
+ *
+ * param: accumulator the accumulator which will keep the merged aggregate results. It should
+ *                    be noted that the accumulator may contain the previous aggregated
+ *                    results. Therefore user should not replace or clean this instance in the
+ *                    custom merge method.
+ * param: iterable    an java.lang.Iterable pointed to a group of accumulators that will be
+ *                    merged.
+ */
+def merge(accumulator: ACC, iterable: java.lang.Iterable[ACC]): Unit
 {% endhighlight %}
 </div>
+
 </div>
 
+{% top %}
+
+Table Aggregate Functions
+-------------------------
+
+A user-defined table aggregate function (_UDTAGG_) maps scalar values of multiple rows to zero, one,
+or multiple rows. The returned record may consist of one or more fields. If an output row consists of
+only one field, the row can be omitted and a scalar value can be emitted. It will be wrapped into an
+implicit row by the runtime.
 
-The following example shows how to
+Similar to an [aggregate function](#aggregate-functions), the behavior of a table aggregate is centered
+around the concept of an accumulator. The accumulator is an intermediate data structure that stores
+the aggregated values until a final aggregation result is computed.
 
-- define a `TableAggregateFunction` that calculates the top 2 values on a given column, 
-- register the function in the `TableEnvironment`, and 
-- use the function in a Table API query(TableAggregateFunction is only supported by Table API).  
+For each set of rows that needs to be aggregated, the runtime will create an empty accumulator by calling
+`createAccumulator()`. Subsequently, the `accumulate(...)` method of the function is called for each
+input row to update the accumulator. Once all rows have been processed, the `emitValue(...)` or `emitUpdateWithRetract(...)`
+method of the function is called to compute and return the final result.
 
-To calculate the top 2 values, the accumulator needs to store the biggest 2 values of all the data that has been accumulated. In our example we define a class `Top2Accum` to be the accumulator. Accumulators are automatically backup-ed by Flink's checkpointing mechanism and restored in case of a failure to ensure exactly-once semantics.
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDTAGG mechanism" src="{{ site.baseurl }}/fig/udtagg-mechanism.png" width="80%">
+</center>
 
-The `accumulate()` method of our `Top2` `TableAggregateFunction` has two inputs. The first one is the `Top2Accum` accumulator, the other one is the user-defined input: input value `v`. Although the `merge()` method is not mandatory for most table aggregation types, we provide it below as examples. Please note that we used Java primitive types and defined `getResultType()` and `getAccumulatorType()` methods in the Scala example because Flink type extraction does not work very well for Scala types.
+In the example, we assume a table that contains data about beverages. The table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the 2 highest prices of all beverages in the table, i.e.,
+perform a `TOP2()` table aggregation. We need to consider each of the 5 rows. The result is a table
+with the top 2 values.
+
+In order to define a table aggregate function, one has to extend the base class `TableAggregateFunction` in
+`org.apache.flink.table.functions` and implement one or more evaluation methods named `accumulate(...)`.
+An accumulate method must be declared publicly and not static. Accumulate methods can also be overloaded
+by implementing multiple methods named `accumulate`.
+
+By default, input, accumulator, and output data types are automatically extracted using reflection. This
+includes the generic argument `ACC` of the class for determining an accumulator data type and the generic
+argument `T` for determining an accumulator data type. Input arguments are derived from one or more
+`accumulate(...)` methods. See the [Implementation Guide](#implementation-guide) for more details.
+
+If you intend to implement or call functions in Python, please refer to the [Python Functions]({% link dev/table/python/python_udfs.md %})
+documentation for more details.
+
+The following example shows how to define your own table aggregate function and call it in a query.
 
 <div class="codetabs" markdown="1">
+
 <div data-lang="java" markdown="1">
 {% highlight java %}
-/**
- * Accumulator for Top2.
- */
-public class Top2Accum {
-    public Integer first;
-    public Integer second;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.TableAggregateFunction;
+import org.apache.flink.util.Collector;
+import static org.apache.flink.table.api.Expressions.*;
+
+// mutable accumulator of structured type for the aggregate function
+public static class Top2Accumulator {
+  public Integer first;
+  public Integer second;
 }
 
-/**
- * The top2 user-defined table aggregate function.
- */
-public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {
-
-    @Override
-    public Top2Accum createAccumulator() {
-        Top2Accum acc = new Top2Accum();
-        acc.first = Integer.MIN_VALUE;
-        acc.second = Integer.MIN_VALUE;
-        return acc;
-    }
+// function that takes (value INT), stores intermediate results in a structured
+// type of Top2Accumulator, and returns the result as a structured type of Tuple2<Integer, Integer>
+// for value and rank
+public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accumulator> {
 
+  @Override
+  public Top2Accumulator createAccumulator() {
+    Top2Accumulator acc = new Top2Accumulator();
+    acc.first = Integer.MIN_VALUE;
+    acc.second = Integer.MIN_VALUE;
+    return acc;
+  }
 
-    public void accumulate(Top2Accum acc, Integer v) {
-        if (v > acc.first) {
-            acc.second = acc.first;
-            acc.first = v;
-        } else if (v > acc.second) {
-            acc.second = v;
-        }
+  public void accumulate(Top2Accumulator acc, Integer value) {
+    if (value > acc.first) {
+      acc.second = acc.first;
+      acc.first = value;
+    } else if (value > acc.second) {
+      acc.second = value;
     }
+  }
 
-    public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
-        for (Top2Accum otherAcc : iterable) {
-            accumulate(acc, otherAcc.first);
-            accumulate(acc, otherAcc.second);
-        }
+  public void merge(Top2Accumulator acc, Iterable<Top2Accumulator> it) {
+    for (Top2Accumulator otherAcc : it) {
+      accumulate(acc, otherAcc.first);
+      accumulate(acc, otherAcc.second);
     }
+  }
 
-    public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
-        // emit the value and rank
-        if (acc.first != Integer.MIN_VALUE) {
-            out.collect(Tuple2.of(acc.first, 1));
-        }
-        if (acc.second != Integer.MIN_VALUE) {
-            out.collect(Tuple2.of(acc.second, 2));
-        }
+  public void emitValue(Top2Accumulator acc, Collector<Tuple2<Integer, Integer>> out) {
+    // emit the value and rank
+    if (acc.first != Integer.MIN_VALUE) {
+      out.collect(Tuple2.of(acc.first, 1));
     }
+    if (acc.second != Integer.MIN_VALUE) {
+      out.collect(Tuple2.of(acc.second, 2));
+    }
+  }
 }
 
-// register function
-StreamTableEnvironment tEnv = ...
-tEnv.registerFunction("top2", new Top2());
+TableEnvironment env = TableEnvironment.create(...);
+
+// call function "inline" without registration in Table API
+env
+  .from("MyTable")
+  .groupBy($("myField"))
+  .flatAggregate(call(Top2.class, $("value")))
+  .select($("myField"), $("f0"), $("f1"));
 
-// init table
-Table tab = ...;
+// call function "inline" without registration in Table API
+// but use an alias for a better naming of Tuple2's fields
+env
+  .from("MyTable")
+  .groupBy($("myField"))
+  .flatAggregate(call(Top2.class, $("value")).as("value", "rank"))
+  .select($("myField"), $("value"), $("rank"));
 
-// use function
-tab.groupBy("key")
-    .flatAggregate("top2(a) as (v, rank)")
-    .select("key, v, rank");
+// register function
+env.createTemporarySystemFunction("Top2", Top2.class);
+
+// call registered function in Table API
+env
+  .from("MyTable")
+  .groupBy($("myField"))
+  .flatAggregate(call("Top2", $("value")).as("value", "rank"))
+  .select($("myField"), $("value"), $("rank"));
 
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-import java.lang.{Integer => JInteger}
-import org.apache.flink.table.api.Types
+import java.lang.Integer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.table.api._
 import org.apache.flink.table.functions.TableAggregateFunction
+import org.apache.flink.util.Collector
 
-/**
- * Accumulator for top2.
- */
-class Top2Accum {
-  var first: JInteger = _
-  var second: JInteger = _
-}
+// mutable accumulator of structured type for the aggregate function
+case class Top2Accumulator(
+  var first: Integer,
+  var second: Integer
+)
 
-/**
- * The top2 user-defined table aggregate function.
- */
-class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum] {
+// function that takes (value INT), stores intermediate results in a structured
+// type of Top2Accumulator, and returns the result as a structured type of Tuple2[Integer, Integer]
+// for value and rank
+class Top2 extends TableAggregateFunction[Tuple2[Integer, Integer], Top2Accumulator] {
 
-  override def createAccumulator(): Top2Accum = {
-    val acc = new Top2Accum
-    acc.first = Int.MinValue
-    acc.second = Int.MinValue
-    acc
+  override def createAccumulator(): Top2Accumulator = {
+    Top2Accumulator(
+      Integer.MIN_VALUE,
+      Integer.MIN_VALUE
+    )
   }
 
-  def accumulate(acc: Top2Accum, v: Int) {
-    if (v > acc.first) {
+  def accumulate(acc: Top2Accumulator, value: Integer): Unit = {
+    if (value > acc.first) {
       acc.second = acc.first
-      acc.first = v
-    } else if (v > acc.second) {
-      acc.second = v
+      acc.first = value
+    } else if (value > acc.second) {
+      acc.second = value
     }
   }
 
-  def merge(acc: Top2Accum, its: JIterable[Top2Accum]): Unit = {
-    val iter = its.iterator()
+  def merge(acc: Top2Accumulator, it: java.lang.Iterable[Top2Accumulator]) {
+    val iter = it.iterator()
     while (iter.hasNext) {
-      val top2 = iter.next()
-      accumulate(acc, top2.first)
-      accumulate(acc, top2.second)
+      val otherAcc = iter.next()
+      accumulate(acc, otherAcc.first)
+      accumulate(acc, otherAcc.second)
     }
   }
 
-  def emitValue(acc: Top2Accum, out: Collector[JTuple2[JInteger, JInteger]]): Unit = {
+  def emitValue(acc: Top2Accumulator, out: Collector[Tuple2[Integer, Integer]]): Unit = {
     // emit the value and rank
-    if (acc.first != Int.MinValue) {
-      out.collect(JTuple2.of(acc.first, 1))
+    if (acc.first != Integer.MIN_VALUE) {
+      out.collect(Tuple2.of(acc.first, 1))
     }
-    if (acc.second != Int.MinValue) {
-      out.collect(JTuple2.of(acc.second, 2))
+    if (acc.second != Integer.MIN_VALUE) {
+      out.collect(Tuple2.of(acc.second, 2))
     }
   }
 }
 
-// init table
-val tab = ...
+val env = TableEnvironment.create(...)
+
+// call function "inline" without registration in Table API
+env
+  .from("MyTable")
+  .groupBy($"myField")
+  .flatAggregate(call(classOf[Top2], $"value"))
+  .select($"myField", $"f0", $"f1")
 
-// use function
-tab
-  .groupBy('key)
-  .flatAggregate(top2('a) as ('v, 'rank))
-  .select('key, 'v, 'rank)
+// call function "inline" without registration in Table API
+// but use an alias for a better naming of Tuple2's fields
+env
+  .from("MyTable")
+  .groupBy($"myField")
+  .flatAggregate(call(classOf[Top2], $"value").as("value", "rank"))
+  .select($"myField", $"value", $"rank")
 
+// register function
+env.createTemporarySystemFunction("Top2", classOf[Top2])
+
+// call registered function in Table API
+env
+  .from("MyTable")
+  .groupBy($"myField")
+  .flatAggregate(call("Top2", $"value").as("value", "rank"))
+  .select($"myField", $"value", $"rank")
 {% endhighlight %}
 </div>
+
 </div>
 
+The `accumulate(...)` method of our `Top2` class takes two inputs. The first one is the accumulator
+and the second one is the user-defined input. In order to calculate a result, the accumulator needs to
+store the 2 highest values of all the data that has been accumulated. Accumulators are automatically managed
+by Flink's checkpointing mechanism and are restored in case of a failure to ensure exactly-once semantics.
+The result values are emitted together with a ranking index.
+
+### Mandatory and Optional Methods
+
+**The following methods are mandatory for each `TableAggregateFunction`:**
+
+- `createAccumulator()`
+- `accumulate(...)`
+- `emitValue(...)` or `emitUpdateWithRetract(...)`
+
+Additionally, there are a few methods that can be optionally implemented. While some of these methods
+allow the system more efficient query execution, others are mandatory for certain use cases. For instance,
+the `merge(...)` method is mandatory if the aggregation function should be applied in the context of a
+session group window (the accumulators of two session windows need to be joined when a row is observed
+that "connects" them).
+
+**The following methods of `TableAggregateFunction` are required depending on the use case:**
+
+- `retract(...)` is required for aggregations on `OVER` windows.
+- `merge(...)` is required for many bounded aggregations and session window aggregations.
+- `emitValue(...)` is required for bounded and window aggregations.
+
+**The following methods of `TableAggregateFunction` are used to improve the performance of streaming jobs:**
+
+- `emitUpdateWithRetract(...)` is used to emit values that have been updated under retract mode.
+
+The `emitValue(...)` method always emits the full data according to the accumulator. In unbounded scenarios,
+this may bring performance problems. Take a Top N function as an example. The `emitValue(...)` would emit
+all N values each time. In order to improve the performance, one can implement `emitUpdateWithRetract(...)` which
+outputs data incrementally in retract mode. In other words, once there is an update, the method can retract
+old records before sending new, updated ones. The method will be used in preference to the `emitValue(...)`
+method.
+
+If the table aggregate function can only be applied in an OVER window, this can be declared by returning the
+requirement `FunctionRequirement.OVER_WINDOW_ONLY` in `getRequirements()`.
+
+If an accumulator needs to store large amounts of data, `org.apache.flink.table.api.dataview.ListView`
+and `org.apache.flink.table.api.dataview.MapView` provide advanced features for leveraging Flink's state
+backends in unbounded data scenarios. Please see the docs of the corresponding classes for more information
+about this advanced feature.
+
+Since some of methods are optional or can be overloaded, the methods are called by generated code. The
+base class does not always provide a signature to be overridden by the concrete implementation class. Nevertheless,
+all mentioned methods must be declared publicly, not static, and named exactly as the names mentioned above
+to be called.
 
-The following example shows how to use `emitUpdateWithRetract` method to emit only updates. To emit only updates, in our example, the accumulator keeps both old and new top 2 values. Note: if the N of topN is big, it may inefficient to keep both old and new values. One way to solve this case is to store the input record into the accumulator in `accumulate` method and then perform calculation in `emitUpdateWithRetract`.
+Detailed documentation for all methods that are not declared in `TableAggregateFunction` and called by generated
+code is given below.
 
+**`accumulate(...)`**
 <div class="codetabs" markdown="1">
+
 <div data-lang="java" markdown="1">
 {% highlight java %}
-/**
- * Accumulator for Top2.
+/*
+ * Processes the input values and updates the provided accumulator instance. The method
+ * accumulate can be overloaded with different custom types and arguments. An aggregate function
+ * requires at least one accumulate() method.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new arrived data).
  */
-public class Top2Accum {
-    public Integer first;
-    public Integer second;
-    public Integer oldFirst;
-    public Integer oldSecond;
-}
+public void accumulate(ACC accumulator, [user defined inputs])
+{% endhighlight %}
+</div>
 
-/**
- * The top2 user-defined table aggregate function.
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/*
+ * Processes the input values and updates the provided accumulator instance. The method
+ * accumulate can be overloaded with different custom types and arguments. An aggregate function
+ * requires at least one accumulate() method.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new arrived data).
  */
-public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {
+def accumulate(accumulator: ACC, [user defined inputs]): Unit
+{% endhighlight %}
+</div>
 
-    @Override
-    public Top2Accum createAccumulator() {
-        Top2Accum acc = new Top2Accum();
-        acc.first = Integer.MIN_VALUE;
-        acc.second = Integer.MIN_VALUE;
-        acc.oldFirst = Integer.MIN_VALUE;
-        acc.oldSecond = Integer.MIN_VALUE;
-        return acc;
-    }
+</div>
 
-    public void accumulate(Top2Accum acc, Integer v) {
-        if (v > acc.first) {
-            acc.second = acc.first;
-            acc.first = v;
-        } else if (v > acc.second) {
-            acc.second = v;
-        }
-    }
+**`retract(...)`**
+<div class="codetabs" markdown="1">
 
-    public void emitUpdateWithRetract(Top2Accum acc, RetractableCollector<Tuple2<Integer, Integer>> out) {
-        if (!acc.first.equals(acc.oldFirst)) {
-            // if there is an update, retract old value then emit new value.
-            if (acc.oldFirst != Integer.MIN_VALUE) {
-                out.retract(Tuple2.of(acc.oldFirst, 1));
-            }
-            out.collect(Tuple2.of(acc.first, 1));
-            acc.oldFirst = acc.first;
-        }
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/*
+ * Retracts the input values from the accumulator instance. The current design assumes the
+ * inputs are the values that have been previously accumulated. The method retract can be
+ * overloaded with different custom types and arguments. This method must be implemented for
+ * bounded OVER aggregates over unbounded tables.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new arrived data).
+ */
+public void retract(ACC accumulator, [user defined inputs])
+{% endhighlight %}
+</div>
 
-        if (!acc.second.equals(acc.oldSecond)) {
-            // if there is an update, retract old value then emit new value.
-            if (acc.oldSecond != Integer.MIN_VALUE) {
-                out.retract(Tuple2.of(acc.oldSecond, 2));
-            }
-            out.collect(Tuple2.of(acc.second, 2));
-            acc.oldSecond = acc.second;
-        }
-    }
-}
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/*
+ * Retracts the input values from the accumulator instance. The current design assumes the
+ * inputs are the values that have been previously accumulated. The method retract can be
+ * overloaded with different custom types and arguments. This method must be implemented for
+ * bounded OVER aggregates over unbounded tables.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new arrived data).
+ */
+def retract(accumulator: ACC, [user defined inputs]): Unit
+{% endhighlight %}
+</div>
 
-// register function
-StreamTableEnvironment tEnv = ...
-tEnv.registerFunction("top2", new Top2());
+</div>
+
+**`merge(...)`**
+<div class="codetabs" markdown="1">
+
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/*
+ * Merges a group of accumulator instances into one accumulator instance. This method must be
+ * implemented for unbounded session window grouping aggregates and bounded grouping aggregates.
+ *
+ * param: accumulator the accumulator which will keep the merged aggregate results. It should
+ *                    be noted that the accumulator may contain the previous aggregated
+ *                    results. Therefore user should not replace or clean this instance in the
+ *                    custom merge method.
+ * param: iterable    an java.lang.Iterable pointed to a group of accumulators that will be
+ *                    merged.
+ */
+public void merge(ACC accumulator, java.lang.Iterable<ACC> iterable)
+{% endhighlight %}
+</div>
 
-// init table
-Table tab = ...;
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/*
+ * Merges a group of accumulator instances into one accumulator instance. This method must be
+ * implemented for unbounded session window grouping aggregates and bounded grouping aggregates.
+ *
+ * param: accumulator the accumulator which will keep the merged aggregate results. It should
+ *                    be noted that the accumulator may contain the previous aggregated
+ *                    results. Therefore user should not replace or clean this instance in the
+ *                    custom merge method.
+ * param: iterable    an java.lang.Iterable pointed to a group of accumulators that will be
+ *                    merged.
+ */
+def merge(accumulator: ACC, iterable: java.lang.Iterable[ACC]): Unit
+{% endhighlight %}
+</div>
 
-// use function
-tab.groupBy("key")
-    .flatAggregate("top2(a) as (v, rank)")
-    .select("key, v, rank");
+</div>
 
+**`emitValue(...)`**
+<div class="codetabs" markdown="1">
+
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/*
+ * Called every time when an aggregation result should be materialized. The returned value could
+ * be either an early and incomplete result (periodically emitted as data arrives) or the final
+ * result of the aggregation.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: out                   the collector used to output data.
+ */
+public void emitValue(ACC accumulator, org.apache.flink.util.Collector<T> out)
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-import java.lang.{Integer => JInteger}
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.functions.TableAggregateFunction
+/*
+ * Called every time when an aggregation result should be materialized. The returned value could
+ * be either an early and incomplete result (periodically emitted as data arrives) or the final
+ * result of the aggregation.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: out                   the collector used to output data.
+ */
+def emitValue(accumulator: ACC, out: org.apache.flink.util.Collector[T]): Unit
+{% endhighlight %}
+</div>
 
-/**
- * Accumulator for top2.
+</div>
+**`emitUpdateWithRetract(...)`**
+<div class="codetabs" markdown="1">
+
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/*
+ * Called every time when an aggregation result should be materialized. The returned value could
+ * be either an early and incomplete result (periodically emitted as data arrives) or the final
+ * result of the aggregation.
+ *
+ * Compared to emitValue(), emitUpdateWithRetract() is used to emit values that have been updated. This method
+ * outputs data incrementally in retraction mode (also known as "update before" and "update after"). Once
+ * there is an update, we have to retract old records before sending new updated ones. The emitUpdateWithRetract()
+ * method will be used in preference to the emitValue() method if both methods are defined in the table aggregate
+ * function, because the method is treated to be more efficient than emitValue as it can output
+ * values incrementally.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: out                   the retractable collector used to output data. Use the collect() method
+ *                              to output(add) records and use retract method to retract(delete)
+ *                              records.
  */
-class Top2Accum {
-  var first: JInteger = _
-  var second: JInteger = _
-  var oldFirst: JInteger = _
-  var oldSecond: JInteger = _
-}
+public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out)
+{% endhighlight %}
+</div>
 
-/**
- * The top2 user-defined table aggregate function.
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/*
+ * Called every time when an aggregation result should be materialized. The returned value could
+ * be either an early and incomplete result (periodically emitted as data arrives) or the final
+ * result of the aggregation.
+ *
+ * Compared to emitValue(), emitUpdateWithRetract() is used to emit values that have been updated. This method
+ * outputs data incrementally in retraction mode (also known as "update before" and "update after"). Once
+ * there is an update, we have to retract old records before sending new updated ones. The emitUpdateWithRetract()
+ * method will be used in preference to the emitValue() method if both methods are defined in the table aggregate
+ * function, because the method is treated to be more efficient than emitValue as it can output
+ * values incrementally.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: out                   the retractable collector used to output data. Use the collect() method
+ *                              to output(add) records and use retract method to retract(delete)
+ *                              records.
  */
-class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum] {
-
-  override def createAccumulator(): Top2Accum = {
-    val acc = new Top2Accum
-    acc.first = Int.MinValue
-    acc.second = Int.MinValue
-    acc.oldFirst = Int.MinValue
-    acc.oldSecond = Int.MinValue
-    acc
+def emitUpdateWithRetract(accumulator: ACC, out: RetractableCollector[T]): Unit
+{% endhighlight %}
+</div>
+
+</div>
+
+### Retraction Example
+
+The following example shows how to use the `emitUpdateWithRetract(...)` method to emit only incremental
+updates. In order to do so, the accumulator keeps both the old and new top 2 values.
+
+_Note_: If the N of Top N is big, it might be inefficient to keep both the old and new values. One way to

Review comment:
       ```suggestion
   If the N of Top N is big, it might be inefficient to keep both the old and new values. One way to
   ```
   
   I don't think we need the note comment because this is what we're trying to show. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13143: [FLINK-18936][docs] Update documentation around aggregate functions

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13143:
URL: https://github.com/apache/flink/pull/13143#issuecomment-673578366


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e3f355ba1e729543a1b902c6f3fba3c6836f5023",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5512",
       "triggerID" : "e3f355ba1e729543a1b902c6f3fba3c6836f5023",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47d9b86d8a44e691fd0779b66bb702d3424aa886",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5525",
       "triggerID" : "47d9b86d8a44e691fd0779b66bb702d3424aa886",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c1a06719b7ee5be9dbe0c1c1a812991bca0349c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5530",
       "triggerID" : "9c1a06719b7ee5be9dbe0c1c1a812991bca0349c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e3f355ba1e729543a1b902c6f3fba3c6836f5023 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5512) 
   * 47d9b86d8a44e691fd0779b66bb702d3424aa886 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5525) 
   * 9c1a06719b7ee5be9dbe0c1c1a812991bca0349c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5530) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #13143: [FLINK-18936][docs] Update documentation around aggregate functions

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #13143:
URL: https://github.com/apache/flink/pull/13143#discussion_r470515620



##########
File path: docs/dev/table/functions/udfs.md
##########
@@ -1268,693 +1044,770 @@ class WeightedAvg extends AggregateFunction[JLong, CountAccumulator] {
     }
   }
 
-  def resetAccumulator(acc: WeightedAvgAccum): Unit = {
+  def resetAccumulator(acc: WeightedAvgAccumulator): Unit = {
     acc.count = 0
     acc.sum = 0L
   }
+}
 
-  override def getAccumulatorType: TypeInformation[WeightedAvgAccum] = {
-    new TupleTypeInfo(classOf[WeightedAvgAccum], Types.LONG, Types.INT)
-  }
+val env = TableEnvironment.create(...)
 
-  override def getResultType: TypeInformation[JLong] = Types.LONG
-}
+// call function "inline" without registration in Table API
+env
+  .from("MyTable")
+  .groupBy($"myField")
+  .select($"myField", call(classOf[WeightedAvg], $"value", $"weight"))
 
 // register function
-val tEnv: StreamTableEnvironment = ???
-tEnv.registerFunction("wAvg", new WeightedAvg())
+env.createTemporarySystemFunction("WeightedAvg", classOf[WeightedAvg])
 
-// use function
-tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
+// call registered function in Table API
+env
+  .from("MyTable")
+  .groupBy($"myField")
+  .select($"myField", call("WeightedAvg", $"value", $"weight"))
 
+// call registered function in SQL
+env.sqlQuery(
+  "SELECT myField, WeightedAvg(value, weight) FROM MyTable GROUP BY myField"
+)
 {% endhighlight %}
 </div>
 
-<div data-lang="python" markdown="1">
-{% highlight python %}
-'''
-Java code:
-
-/**
- * Accumulator for WeightedAvg.
- */
-public static class WeightedAvgAccum {
-    public long sum = 0;
-    public int count = 0;
-}
-
-// The java class must have a public no-argument constructor and can be founded in current java classloader.
-
-/**
- * Weighted Average user-defined aggregate function.
- */
-public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {
-
-    @Override
-    public WeightedAvgAccum createAccumulator() {
-        return new WeightedAvgAccum();
-    }
-
-    @Override
-    public Long getValue(WeightedAvgAccum acc) {
-        if (acc.count == 0) {
-            return null;
-        } else {
-            return acc.sum / acc.count;
-        }
-    }
-
-    public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
-        acc.sum += iValue * iWeight;
-        acc.count += iWeight;
-    }
-
-    public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
-        acc.sum -= iValue * iWeight;
-        acc.count -= iWeight;
-    }
-    
-    public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
-        Iterator<WeightedAvgAccum> iter = it.iterator();
-        while (iter.hasNext()) {
-            WeightedAvgAccum a = iter.next();
-            acc.count += a.count;
-            acc.sum += a.sum;
-        }
-    }
-    
-    public void resetAccumulator(WeightedAvgAccum acc) {
-        acc.count = 0;
-        acc.sum = 0L;
-    }
-}
-'''
-
-# register function
-t_env = ...  # type: StreamTableEnvironment
-t_env.register_java_function("wAvg", "my.java.function.WeightedAvg")
+</div>
 
-# use function
-t_env.sql_query("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs. The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted average value, the accumulator
+needs to store the weighted sum and count of all the data that has been accumulated. In our example, we
+define a class `WeightedAvgAccumulator` to be the accumulator. Accumulators are automatically managed
+by Flink's checkpointing mechanism and are restored in case of a failure to ensure exactly-once semantics.
 
-{% endhighlight %}
-</div>
-</div>
+### Mandatory and Optional Methods
 
-{% top %}
+**The following methods are mandatory for each `AggregateFunction`:**
 
-Table Aggregation Functions
----------------------
+- `createAccumulator()`
+- `accumulate(...)` 
+- `getValue(...)`
+ 
+Additionally, there are a few methods that can be optionally implemented. While some of these methods
+allow the system more efficient query execution, others are mandatory for certain use cases. For instance,
+the `merge(...)` method is mandatory if the aggregation function should be applied in the context of a
+session group window (the accumulators of two session windows need to be joined when a row is observed
+that "connects" them).
 
-User-Defined Table Aggregate Functions (UDTAGGs) aggregate a table (one or more rows with one or more attributes) to a result table with multi rows and columns. 
+**The following methods of `AggregateFunction` are required depending on the use case:**
 
-<center>
-<img alt="UDAGG mechanism" src="{{ site.baseurl }}/fig/udtagg-mechanism.png" width="80%">
-</center>
+- `retract(...)` is required for aggregations on `OVER` windows.
+- `merge(...)` is required for many bounded aggregations and session window aggregations.
 
-The above figure shows an example of a table aggregation. Assume you have a table that contains data about beverages. The table consists of three columns, `id`, `name` and `price` and 5 rows. Imagine you need to find the top 2 highest prices of all beverages in the table, i.e., perform a `top2()` table aggregation. You would need to check each of the 5 rows and the result would be a table with the top 2 values.
+If the aggregate function can only be applied in an OVER window, this can be declared by returning the
+requirement `FunctionRequirement.OVER_WINDOW_ONLY` in `getRequirements()`.
 
-User-defined table aggregation functions are implemented by extending the `TableAggregateFunction` class. A `TableAggregateFunction` works as follows. First, it needs an `accumulator`, which is the data structure that holds the intermediate result of the aggregation. An empty accumulator is created by calling the `createAccumulator()` method of the `TableAggregateFunction`. Subsequently, the `accumulate()` method of the function is called for each input row to update the accumulator. Once all rows have been processed, the `emitValue()` method of the function is called to compute and return the final results. 
+If an accumulator needs to store large amounts of data, `org.apache.flink.table.api.dataview.ListView`
+and `org.apache.flink.table.api.dataview.MapView` provide advanced features for leveraging Flink's state
+backends in unbounded data scenarios. Please see the docs of the corresponding classes for more information
+about this advanced feature.
 
-**The following methods are mandatory for each `TableAggregateFunction`:**
+Since some of methods are optional or can be overloaded, the methods are called by generated code. The
+base class does not always provide a signature to be overridden by the concrete implementation class. Nevertheless,
+all mentioned methods must be declared publicly, not static, and named exactly as the names mentioned above
+to be called.
 
-- `createAccumulator()`
-- `accumulate()` 
+Detailed documentation for all methods that are not declared in `AggregateFunction` and called by generated
+code is given below.
 
-Flink’s type extraction facilities can fail to identify complex data types, e.g., if they are not basic types or simple POJOs. So similar to `ScalarFunction` and `TableFunction`, `TableAggregateFunction` provides methods to specify the `TypeInformation` of the result type (through 
- `TableAggregateFunction#getResultType()`) and the type of the accumulator (through `TableAggregateFunction#getAccumulatorType()`).
- 
-Besides the above methods, there are a few contracted methods that can be 
-optionally implemented. While some of these methods allow the system more efficient query execution, others are mandatory for certain use cases. For instance, the `merge()` method is mandatory if the aggregation function should be applied in the context of a session group window (the accumulators of two session windows need to be joined when a row is observed that "connects" them). 
+**`accumulate(...)`**
+<div class="codetabs" markdown="1">
 
-**The following methods of `TableAggregateFunction` are required depending on the use case:**
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/*
+ * Processes the input values and updates the provided accumulator instance. The method
+ * accumulate can be overloaded with different custom types and arguments. An aggregate function
+ * requires at least one accumulate() method.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new arrived data).
+ */
+public void accumulate(ACC accumulator, [user defined inputs])
+{% endhighlight %}
+</div>
 
-- `retract()` is required for aggregations on bounded `OVER` windows.
-- `merge()` is required for many batch aggregations and session window aggregations.
-- `resetAccumulator()` is required for many batch aggregations.
-- `emitValue()` is required for batch and window aggregations.
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/*
+ * Processes the input values and updates the provided accumulator instance. The method
+ * accumulate can be overloaded with different custom types and arguments. An aggregate function
+ * requires at least one accumulate() method.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new arrived data).
+ */
+def accumulate(accumulator: ACC, [user defined inputs]): Unit
+{% endhighlight %}
+</div>
 
-**The following methods of `TableAggregateFunction` are used to improve the performance of streaming jobs:**
+</div>
 
-- `emitUpdateWithRetract()` is used to emit values that have been updated under retract mode.
+**`retract(...)`**
+<div class="codetabs" markdown="1">
 
-For `emitValue` method, it emits full data according to the accumulator. Take TopN as an example, `emitValue` emit all top n values each time. This may bring performance problems for streaming jobs. To improve the performance, a user can also implement `emitUpdateWithRetract` method to improve the performance. The method outputs data incrementally in retract mode, i.e., once there is an update, we have to retract old records before sending new updated ones. The method will be used in preference to the `emitValue` method if they are all defined in the table aggregate function, because `emitUpdateWithRetract` is treated to be more efficient than `emitValue` as it can output values incrementally.
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/*
+ * Retracts the input values from the accumulator instance. The current design assumes the
+ * inputs are the values that have been previously accumulated. The method retract can be
+ * overloaded with different custom types and arguments. This method must be implemented for
+ * bounded OVER aggregates over unbounded tables.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new arrived data).
+ */
+public void retract(ACC accumulator, [user defined inputs])
+{% endhighlight %}
+</div>
 
-All methods of `TableAggregateFunction` must be declared as `public`, not `static` and named exactly as the names mentioned above. The methods `createAccumulator`, `getResultType`, and `getAccumulatorType` are defined in the parent abstract class of `TableAggregateFunction`, while others are contracted methods. In order to define a table aggregate function, one has to extend the base class `org.apache.flink.table.functions.TableAggregateFunction` and implement one (or more) `accumulate` methods. The method `accumulate` can be overloaded with different parameter types and supports variable arguments.
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/*
+ * Retracts the input values from the accumulator instance. The current design assumes the
+ * inputs are the values that have been previously accumulated. The method retract can be
+ * overloaded with different custom types and arguments. This method must be implemented for
+ * bounded OVER aggregates over unbounded tables.
+ *
+ * param: accumulator           the accumulator which contains the current aggregated results
+ * param: [user defined inputs] the input value (usually obtained from new arrived data).
+ */
+def retract(accumulator: ACC, [user defined inputs]): Unit
+{% endhighlight %}
+</div>
 
-Detailed documentation for all methods of `TableAggregateFunction` is given below. 
+</div>
 
+**`merge(...)`**
 <div class="codetabs" markdown="1">
+
 <div data-lang="java" markdown="1">
 {% highlight java %}
-
-/**
-  * Base class for user-defined aggregates and table aggregates.
-  *
-  * @param <T>   the type of the aggregation result.
-  * @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
-  *             aggregated values which are needed to compute an aggregation result.
-  */
-public abstract class UserDefinedAggregateFunction<T, ACC> extends UserDefinedFunction {
-
-  /**
-    * Creates and init the Accumulator for this (table)aggregate function.
-    *
-    * @return the accumulator with the initial value
-    */
-  public ACC createAccumulator(); // MANDATORY
-
-  /**
-    * Returns the TypeInformation of the (table)aggregate function's result.
-    *
-    * @return The TypeInformation of the (table)aggregate function's result or null if the result
-    *         type should be automatically inferred.
-    */
-  public TypeInformation<T> getResultType = null; // PRE-DEFINED
-
-  /**
-    * Returns the TypeInformation of the (table)aggregate function's accumulator.
-    *
-    * @return The TypeInformation of the (table)aggregate function's accumulator or null if the
-    *         accumulator type should be automatically inferred.
-    */
-  public TypeInformation<ACC> getAccumulatorType = null; // PRE-DEFINED
-}
-
-/**
-  * Base class for table aggregation functions. 
-  *
-  * @param <T>   the type of the aggregation result
-  * @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
-  *             aggregated values which are needed to compute a table aggregation result.
-  *             TableAggregateFunction represents its state using accumulator, thereby the state of
-  *             the TableAggregateFunction must be put into the accumulator.
-  */
-public abstract class TableAggregateFunction<T, ACC> extends UserDefinedAggregateFunction<T, ACC> {
-
-  /** Processes the input values and update the provided accumulator instance. The method
-    * accumulate can be overloaded with different custom types and arguments. A TableAggregateFunction
-    * requires at least one accumulate() method.
-    *
-    * @param accumulator           the accumulator which contains the current aggregated results
-    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
-    */
-  public void accumulate(ACC accumulator, [user defined inputs]); // MANDATORY
-
-  /**
-    * Retracts the input values from the accumulator instance. The current design assumes the
-    * inputs are the values that have been previously accumulated. The method retract can be
-    * overloaded with different custom types and arguments. This function must be implemented for
-    * datastream bounded over aggregate.
-    *
-    * @param accumulator           the accumulator which contains the current aggregated results
-    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
-    */
-  public void retract(ACC accumulator, [user defined inputs]); // OPTIONAL
-
-  /**
-    * Merges a group of accumulator instances into one accumulator instance. This function must be
-    * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
-    *
-    * @param accumulator  the accumulator which will keep the merged aggregate results. It should
-    *                     be noted that the accumulator may contain the previous aggregated
-    *                     results. Therefore user should not replace or clean this instance in the
-    *                     custom merge method.
-    * @param its          an {@link java.lang.Iterable} pointed to a group of accumulators that will be
-    *                     merged.
-    */
-  public void merge(ACC accumulator, java.lang.Iterable<ACC> its); // OPTIONAL
-
-  /**
-    * Called every time when an aggregation result should be materialized. The returned value
-    * could be either an early and incomplete result  (periodically emitted as data arrive) or
-    * the final result of the  aggregation.
-    *
-    * @param accumulator the accumulator which contains the current
-    *                    aggregated results
-    * @param out         the collector used to output data
-    */
-  public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL
-  
-  /**
-    * Called every time when an aggregation result should be materialized. The returned value
-    * could be either an early and incomplete result (periodically emitted as data arrive) or
-    * the final result of the aggregation.
-    *
-    * Different from emitValue, emitUpdateWithRetract is used to emit values that have been updated.
-    * This method outputs data incrementally in retract mode, i.e., once there is an update, we
-    * have to retract old records before sending new updated ones. The emitUpdateWithRetract
-    * method will be used in preference to the emitValue method if both methods are defined in the
-    * table aggregate function, because the method is treated to be more efficient than emitValue
-    * as it can outputvalues incrementally.
-    *
-    * @param accumulator the accumulator which contains the current
-    *                    aggregated results
-    * @param out         the retractable collector used to output data. Use collect method
-    *                    to output(add) records and use retract method to retract(delete)
-    *                    records.
-    */
-  public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out); // OPTIONAL
-  
-  /**
-    * Collects a record and forwards it. The collector can output retract messages with the retract
-    * method. Note: only use it in {@code emitRetractValueIncrementally}.
-    */
-  public interface RetractableCollector<T> extends Collector<T> {
-
-      /**
-        * Retract a record.
-        *
-        * @param record The record to retract.
-        */
-      void retract(T record);
-  }
-}
+/*
+ * Merges a group of accumulator instances into one accumulator instance. This method must be
+ * implemented for unbounded session window grouping aggregates and bounded grouping aggregates.
+ *
+ * param: accumulator the accumulator which will keep the merged aggregate results. It should
+ *                    be noted that the accumulator may contain the previous aggregated
+ *                    results. Therefore user should not replace or clean this instance in the
+ *                    custom merge method.
+ * param: iterable    an java.lang.Iterable pointed to a group of accumulators that will be
+ *                    merged.
+ */
+public void merge(ACC accumulator, java.lang.Iterable<ACC> iterable)
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-/**
-  * Base class for user-defined aggregates and table aggregates.
-  *
-  * @tparam T   the type of the aggregation result.
-  * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the
-  *             aggregated values which are needed to compute an aggregation result.
-  */
-abstract class UserDefinedAggregateFunction[T, ACC] extends UserDefinedFunction {
-
-  /**
-    * Creates and init the Accumulator for this (table)aggregate function.
-    *
-    * @return the accumulator with the initial value
-    */
-  def createAccumulator(): ACC // MANDATORY
-
-  /**
-    * Returns the TypeInformation of the (table)aggregate function's result.
-    *
-    * @return The TypeInformation of the (table)aggregate function's result or null if the result
-    *         type should be automatically inferred.
-    */
-  def getResultType: TypeInformation[T] = null // PRE-DEFINED
-
-  /**
-    * Returns the TypeInformation of the (table)aggregate function's accumulator.
-    *
-    * @return The TypeInformation of the (table)aggregate function's accumulator or null if the
-    *         accumulator type should be automatically inferred.
-    */
-  def getAccumulatorType: TypeInformation[ACC] = null // PRE-DEFINED
-}
-
-/**
-  * Base class for table aggregation functions. 
-  *
-  * @tparam T   the type of the aggregation result
-  * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the
-  *             aggregated values which are needed to compute an aggregation result.
-  *             TableAggregateFunction represents its state using accumulator, thereby the state of
-  *             the TableAggregateFunction must be put into the accumulator.
-  */
-abstract class TableAggregateFunction[T, ACC] extends UserDefinedAggregateFunction[T, ACC] {
-
-  /**
-    * Processes the input values and update the provided accumulator instance. The method
-    * accumulate can be overloaded with different custom types and arguments. A TableAggregateFunction
-    * requires at least one accumulate() method.
-    *
-    * @param accumulator           the accumulator which contains the current aggregated results
-    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
-    */
-  def accumulate(accumulator: ACC, [user defined inputs]): Unit // MANDATORY
-
-  /**
-    * Retracts the input values from the accumulator instance. The current design assumes the
-    * inputs are the values that have been previously accumulated. The method retract can be
-    * overloaded with different custom types and arguments. This function must be implemented for
-    * datastream bounded over aggregate.
-    *
-    * @param accumulator           the accumulator which contains the current aggregated results
-    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
-    */
-  def retract(accumulator: ACC, [user defined inputs]): Unit // OPTIONAL
-
-  /**
-    * Merges a group of accumulator instances into one accumulator instance. This function must be
-    * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
-    *
-    * @param accumulator  the accumulator which will keep the merged aggregate results. It should
-    *                     be noted that the accumulator may contain the previous aggregated
-    *                     results. Therefore user should not replace or clean this instance in the
-    *                     custom merge method.
-    * @param its          an [[java.lang.Iterable]] pointed to a group of accumulators that will be
-    *                     merged.
-    */
-  def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit // OPTIONAL
-  
-  /**
-    * Called every time when an aggregation result should be materialized. The returned value
-    * could be either an early and incomplete result  (periodically emitted as data arrive) or
-    * the final result of the  aggregation.
-    *
-    * @param accumulator the accumulator which contains the current
-    *                    aggregated results
-    * @param out         the collector used to output data
-    */
-  def emitValue(accumulator: ACC, out: Collector[T]): Unit // OPTIONAL
-
-  /**
-    * Called every time when an aggregation result should be materialized. The returned value
-    * could be either an early and incomplete result (periodically emitted as data arrive) or
-    * the final result of the aggregation.
-    *
-    * Different from emitValue, emitUpdateWithRetract is used to emit values that have been updated.
-    * This method outputs data incrementally in retract mode, i.e., once there is an update, we
-    * have to retract old records before sending new updated ones. The emitUpdateWithRetract
-    * method will be used in preference to the emitValue method if both methods are defined in the
-    * table aggregate function, because the method is treated to be more efficient than emitValue
-    * as it can outputvalues incrementally.
-    *
-    * @param accumulator the accumulator which contains the current
-    *                    aggregated results
-    * @param out         the retractable collector used to output data. Use collect method
-    *                    to output(add) records and use retract method to retract(delete)
-    *                    records.
-    */
-  def emitUpdateWithRetract(accumulator: ACC, out: RetractableCollector[T]): Unit // OPTIONAL
- 
-  /**
-    * Collects a record and forwards it. The collector can output retract messages with the retract
-    * method. Note: only use it in `emitRetractValueIncrementally`.
-    */
-  trait RetractableCollector[T] extends Collector[T] {
-    
-    /**
-      * Retract a record.
-      *
-      * @param record The record to retract.
-      */
-    def retract(record: T): Unit
-  }
-}
+/*
+ * Merges a group of accumulator instances into one accumulator instance. This method must be
+ * implemented for unbounded session window grouping aggregates and bounded grouping aggregates.
+ *
+ * param: accumulator the accumulator which will keep the merged aggregate results. It should
+ *                    be noted that the accumulator may contain the previous aggregated
+ *                    results. Therefore user should not replace or clean this instance in the
+ *                    custom merge method.
+ * param: iterable    an java.lang.Iterable pointed to a group of accumulators that will be
+ *                    merged.
+ */
+def merge(accumulator: ACC, iterable: java.lang.Iterable[ACC]): Unit
 {% endhighlight %}
 </div>
+
 </div>
 
+{% top %}
+
+Table Aggregate Functions
+-------------------------
+
+A user-defined table aggregate function (_UDTAGG_) maps scalar values of multiple rows to zero, one,
+or multiple rows. The returned record may consist of one or more fields. If an output row consists of
+only one field, the row can be omitted and a scalar value can be emitted. It will be wrapped into an
+implicit row by the runtime.

Review comment:
       I will replace `and` with `that` and fix the `an`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13143: [FLINK-18936][docs] Update documentation around aggregate functions

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13143:
URL: https://github.com/apache/flink/pull/13143#issuecomment-673578366


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e3f355ba1e729543a1b902c6f3fba3c6836f5023",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e3f355ba1e729543a1b902c6f3fba3c6836f5023",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e3f355ba1e729543a1b902c6f3fba3c6836f5023 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13143: [FLINK-18936][docs] Update documentation around aggregate functions

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13143:
URL: https://github.com/apache/flink/pull/13143#issuecomment-673578366


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e3f355ba1e729543a1b902c6f3fba3c6836f5023",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5512",
       "triggerID" : "e3f355ba1e729543a1b902c6f3fba3c6836f5023",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47d9b86d8a44e691fd0779b66bb702d3424aa886",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "47d9b86d8a44e691fd0779b66bb702d3424aa886",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e3f355ba1e729543a1b902c6f3fba3c6836f5023 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5512) 
   * 47d9b86d8a44e691fd0779b66bb702d3424aa886 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13143: [FLINK-18936][docs] Update documentation around aggregate functions

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13143:
URL: https://github.com/apache/flink/pull/13143#issuecomment-673578366


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e3f355ba1e729543a1b902c6f3fba3c6836f5023",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5512",
       "triggerID" : "e3f355ba1e729543a1b902c6f3fba3c6836f5023",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e3f355ba1e729543a1b902c6f3fba3c6836f5023 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5512) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13143: [FLINK-18936][docs] Update documentation around aggregate functions

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13143:
URL: https://github.com/apache/flink/pull/13143#issuecomment-673578366


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e3f355ba1e729543a1b902c6f3fba3c6836f5023",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5512",
       "triggerID" : "e3f355ba1e729543a1b902c6f3fba3c6836f5023",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e3f355ba1e729543a1b902c6f3fba3c6836f5023 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5512) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org