You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/19 05:24:07 UTC

[flink] branch release-1.11 updated: [FLINK-16094][docs-zh] Translate /dev/table/functions/udfs.zh.md into Chinese

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 4314e7a  [FLINK-16094][docs-zh] Translate /dev/table/functions/udfs.zh.md into Chinese
4314e7a is described below

commit 4314e7af4bfc46f8f61dd58a1b2a9350e91c38fb
Author: libenchao <li...@gmail.com>
AuthorDate: Sun Feb 23 15:21:29 2020 +0800

    [FLINK-16094][docs-zh] Translate /dev/table/functions/udfs.zh.md into Chinese
    
    This closes #11191
---
 docs/dev/table/functions/udfs.md    | 101 +++++-----
 docs/dev/table/functions/udfs.zh.md | 388 ++++++++++++++++++------------------
 2 files changed, 237 insertions(+), 252 deletions(-)

diff --git a/docs/dev/table/functions/udfs.md b/docs/dev/table/functions/udfs.md
index 26f557a..c53a258 100644
--- a/docs/dev/table/functions/udfs.md
+++ b/docs/dev/table/functions/udfs.md
@@ -215,6 +215,30 @@ tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(s
 {% endhighlight %}
 </div>
 
+Please note that POJO types do not have a deterministic field order. Therefore, you cannot rename the fields of POJO returned by a table function using `AS`.
+
+By default the result type of a `TableFunction` is determined by Flink’s automatic type extraction facilities. This works well for basic types and simple POJOs but might be wrong for more complex, custom, or composite types. In such a case, the type of the result can be manually specified by overriding `TableFunction#getResultType()` which returns its `TypeInformation`.
+
+The following example shows an example of a `TableFunction` that returns a `Row` type which requires explicit type information. We define that the returned table type should be `RowTypeInfo(String, Integer)` by overriding `TableFunction#getResultType()`.
+
+{% highlight java %}
+public class CustomTypeSplit extends TableFunction<Row> {
+    public void eval(String str) {
+        for (String s : str.split(" ")) {
+            Row row = new Row(2);
+            row.setField(0, s);
+            row.setField(1, s.length());
+            collect(row);
+        }
+    }
+
+    @Override
+    public TypeInformation<Row> getResultType() {
+        return Types.ROW(Types.STRING(), Types.INT());
+    }
+}
+{% endhighlight %}
+
 <div data-lang="scala" markdown="1">
 In order to define a table function one has to extend the base class `TableFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a table function is determined by its evaluation methods. An evaluation method must be declared `public` and named `eval`. The `TableFunction` can be overloaded by implementing multiple methods named `eval`. The parameter types of the evaluation methods determine all valid parameters of the table functio [...]
 
@@ -250,6 +274,30 @@ tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a))
 tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE")
 {% endhighlight %}
 **IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues.
+
+Please note that POJO types do not have a deterministic field order. Therefore, you cannot rename the fields of POJO returned by a table function using `AS`.
+
+By default the result type of a `TableFunction` is determined by Flink’s automatic type extraction facilities. This works well for basic types and simple POJOs but might be wrong for more complex, custom, or composite types. In such a case, the type of the result can be manually specified by overriding `TableFunction#getResultType()` which returns its `TypeInformation`.
+
+The following example shows an example of a `TableFunction` that returns a `Row` type which requires explicit type information. We define that the returned table type should be `RowTypeInfo(String, Integer)` by overriding `TableFunction#getResultType()`.
+
+{% highlight scala %}
+class CustomTypeSplit extends TableFunction[Row] {
+  def eval(str: String): Unit = {
+    str.split(" ").foreach({ s =>
+      val row = new Row(2)
+      row.setField(0, s)
+      row.setField(1, s.length)
+      collect(row)
+    })
+  }
+
+  override def getResultType: TypeInformation[Row] = {
+    Types.ROW(Types.STRING, Types.INT)
+  }
+}
+{% endhighlight %}
+
 </div>
 
 <div data-lang="python" markdown="1">
@@ -288,59 +336,6 @@ Please refer to the [Python Table Function]({{ site.baseurl }}/dev/table/python/
 </div>
 </div>
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-Please note that POJO types do not have a deterministic field order. Therefore, you cannot rename the fields of POJO returned by a table function using `AS`.
-
-By default the result type of a `TableFunction` is determined by Flink’s automatic type extraction facilities. This works well for basic types and simple POJOs but might be wrong for more complex, custom, or composite types. In such a case, the type of the result can be manually specified by overriding `TableFunction#getResultType()` which returns its `TypeInformation`.
-
-The following example shows an example of a `TableFunction` that returns a `Row` type which requires explicit type information. We define that the returned table type should be `RowTypeInfo(String, Integer)` by overriding `TableFunction#getResultType()`.
-
-{% highlight java %}
-public class CustomTypeSplit extends TableFunction<Row> {
-    public void eval(String str) {
-        for (String s : str.split(" ")) {
-            Row row = new Row(2);
-            row.setField(0, s);
-            row.setField(1, s.length());
-            collect(row);
-        }
-    }
-
-    @Override
-    public TypeInformation<Row> getResultType() {
-        return Types.ROW(Types.STRING(), Types.INT());
-    }
-}
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-Please note that POJO types do not have a deterministic field order. Therefore, you cannot rename the fields of POJO returned by a table function using `AS`.
-
-By default the result type of a `TableFunction` is determined by Flink’s automatic type extraction facilities. This works well for basic types and simple POJOs but might be wrong for more complex, custom, or composite types. In such a case, the type of the result can be manually specified by overriding `TableFunction#getResultType()` which returns its `TypeInformation`.
-
-The following example shows an example of a `TableFunction` that returns a `Row` type which requires explicit type information. We define that the returned table type should be `RowTypeInfo(String, Integer)` by overriding `TableFunction#getResultType()`.
-
-{% highlight scala %}
-class CustomTypeSplit extends TableFunction[Row] {
-  def eval(str: String): Unit = {
-    str.split(" ").foreach({ s =>
-      val row = new Row(2)
-      row.setField(0, s)
-      row.setField(1, s.length)
-      collect(row)
-    })
-  }
-
-  override def getResultType: TypeInformation[Row] = {
-    Types.ROW(Types.STRING, Types.INT)
-  }
-}
-{% endhighlight %}
-</div>
-</div>
-
 {% top %}
 
 
diff --git a/docs/dev/table/functions/udfs.zh.md b/docs/dev/table/functions/udfs.zh.md
index c0e7c67..995a67c 100644
--- a/docs/dev/table/functions/udfs.zh.md
+++ b/docs/dev/table/functions/udfs.zh.md
@@ -22,33 +22,31 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-User-defined functions are an important feature, because they significantly extend the expressiveness of queries.
+自定义函数是一个非常重要的功能,因为它极大的扩展了查询的表达能力。
 
 * This will be replaced by the TOC
 {:toc}
 
-Register User-Defined Functions
+注册自定义函数
 -------------------------------
-In most cases, a user-defined function must be registered before it can be used in an query. It is not necessary to register functions for the Scala Table API. 
+在大多数情况下,自定义函数在使用之前都需要注册。在 Scala Table API 中可以不用注册。
 
-Functions are registered at the `TableEnvironment` by calling a `registerFunction()` method. When a user-defined function is registered, it is inserted into the function catalog of the `TableEnvironment` such that the Table API or SQL parser can recognize and properly translate it. 
-
-Please find detailed examples of how to register and how to call each type of user-defined function 
-(`ScalarFunction`, `TableFunction`, and `AggregateFunction`) in the following sub-sessions.
+通过调用 `registerFunction()` 把函数注册到 `TableEnvironment`。当一个函数注册之后,它就在 `TableEnvironment` 的函数 catalog 里面了,这样 Table API 或者 SQL 解析器就可以识别并使用它。
 
+关于如何注册和使用每种类型的自定义函数(标量函数、表值函数和聚合函数),更多示例可以看下面的部分。
 
 {% top %}
 
-Scalar Functions
+标量函数
 ----------------
 
-If a required scalar function is not contained in the built-in functions, it is possible to define custom, user-defined scalar functions for both the Table API and SQL. A user-defined scalar functions maps zero, one, or multiple scalar values to a new scalar value.
+如果需要的标量函数没有被内置函数覆盖,就可以在自定义一个标量函数在 Table API 和 SQL 中使用。自定义标量函数可以把 0 到多个标量值映射成 1 个标量值。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-In order to define a scalar function, one has to extend the base class `ScalarFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a scalar function is determined by the evaluation method. An evaluation method must be declared publicly and named `eval`. The parameter types and return type of the evaluation method also determine the parameter and return types of the scalar function. Evaluation methods can also be overloaded by imp [...]
+想要实现自定义标量函数,你需要扩展 `org.apache.flink.table.functions` 里面的 `ScalarFunction` 并且实现一个或者多个求值方法。标量函数的行为取决于你写的求值方法。求值方法并须是 `public` 的,而且名字必须是 `eval`。求值方法的参数类型以及返回值类型就决定了标量函数的参数类型和返回值类型。可以通过实现多个名为 `eval` 的方法对求值方法进行重载。求值方法也支持可变参数,例如 `eval(String... strs)`。
 
-The following example shows how to define your own hash code function, register it in the TableEnvironment, and call it in a query. Note that you can configure your scalar function via a constructor before it is registered:
+下面的示例展示了如何实现一个求哈希值的函数。先把它注册到 `TableEnvironment` 里,然后在查询的时候就可以直接使用了。需要注意的是,你可以在注册之前通过构造方法来配置你的标量函数:
 
 {% highlight java %}
 public class HashCode extends ScalarFunction {
@@ -65,19 +63,19 @@ public class HashCode extends ScalarFunction {
 
 BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
 
-// register the function
+// 注册函数
 tableEnv.registerFunction("hashCode", new HashCode(10));
 
-// use the function in Java Table API
+// 在 Java Table API 中使用函数
 myTable.select("string, string.hashCode(), hashCode(string)");
 
-// use the function in SQL API
+// 在 SQL API 中使用函数
 tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");
 {% endhighlight %}
 
-By default the result type of an evaluation method is determined by Flink's type extraction facilities. This is sufficient for basic types or simple POJOs but might be wrong for more complex, custom, or composite types. In these cases `TypeInformation` of the result type can be manually defined by overriding `ScalarFunction#getResultType()`.
+求值方法的返回值类型默认是由 Flink 的类型推导来决定的。类型推导可以推导出基本数据类型以及简单的 POJO,但是对于更复杂的、自定义的、或者组合类型,可能会推导出错误的结果。在这种情况下,可以通过覆盖 `ScalarFunction#getResultType()`,并且返回 `TypeInformation` 来定义复杂类型。
 
-The following example shows an advanced example which takes the internal timestamp representation and also returns the internal timestamp representation as a long value. By overriding `ScalarFunction#getResultType()` we define that the returned long value should be interpreted as a `Types.TIMESTAMP` by the code generation.
+下面的示例展示了一个高级一点的自定义标量函数用法,它接收一个内部的时间戳参数,并且以 `long` 的形式返回该内部的时间戳。通过覆盖 `ScalarFunction#getResultType()`,我们定义了我们返回的 `long` 类型在代码生成时可以被解析为 `Types.TIMESTAMP` 类型。
 
 {% highlight java %}
 public static class TimestampModifier extends ScalarFunction {
@@ -93,12 +91,12 @@ public static class TimestampModifier extends ScalarFunction {
 </div>
 
 <div data-lang="scala" markdown="1">
-In order to define a scalar function, one has to extend the base class `ScalarFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a scalar function is determined by the evaluation method. An evaluation method must be declared publicly and named `eval`. The parameter types and return type of the evaluation method also determine the parameter and return types of the scalar function. Evaluation methods can also be overloaded by imp [...]
+想要实现自定义标量函数,你需要扩展 `org.apache.flink.table.functions` 里面的 `ScalarFunction` 并且实现一个或者多个求值方法。标量函数的行为取决于你写的求值方法。求值方法并须是 `public` 的,而且名字必须是 `eval`。求值方法的参数类型以及返回值类型就决定了标量函数的参数类型和返回值类型。可以通过实现多个名为 `eval` 的方法对求值方法进行重载。求值方法也支持可变参数,例如 `@varargs def eval(str: String*)`。
 
-The following example shows how to define your own hash code function, register it in the TableEnvironment, and call it in a query. Note that you can configure your scalar function via a constructor before it is registered:
+下面的示例展示了如何实现一个求哈希值的函数。先把它注册到 `TableEnvironment` 里,然后在查询的时候就可以直接使用了。需要注意的是,你可以在注册之前通过构造方法来配置你的标量函数:
 
 {% highlight scala %}
-// must be defined in static/object context
+// 必须定义在 static/object 上下文中
 class HashCode(factor: Int) extends ScalarFunction {
   def eval(s: String): Int = {
     s.hashCode() * factor
@@ -107,18 +105,18 @@ class HashCode(factor: Int) extends ScalarFunction {
 
 val tableEnv = BatchTableEnvironment.create(env)
 
-// use the function in Scala Table API
+// 在 Scala Table API 中使用函数
 val hashCode = new HashCode(10)
 myTable.select('string, hashCode('string))
 
-// register and use the function in SQL
+// 在 SQL 中注册和使用函数
 tableEnv.registerFunction("hashCode", new HashCode(10))
 tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable")
 {% endhighlight %}
 
-By default the result type of an evaluation method is determined by Flink's type extraction facilities. This is sufficient for basic types or simple POJOs but might be wrong for more complex, custom, or composite types. In these cases `TypeInformation` of the result type can be manually defined by overriding `ScalarFunction#getResultType()`.
+求值方法的返回值类型默认是由 Flink 的类型推导来决定的。类型推导可以推导出基本数据类型以及简单的 POJO,但是对于更复杂的、自定义的、或者组合类型,可能会推导出错误的结果。在这种情况下,可以通过覆盖 `ScalarFunction#getResultType()`,并且返回 `TypeInformation` 来定义复杂类型。
 
-The following example shows an advanced example which takes the internal timestamp representation and also returns the internal timestamp representation as a long value. By overriding `ScalarFunction#getResultType()` we define that the returned long value should be interpreted as a `Types.TIMESTAMP` by the code generation.
+下面的示例展示了一个高级一点的自定义标量函数用法,它接收一个内部的时间戳参数,并且以 `long` 的形式返回该内部的时间戳。通过覆盖 `ScalarFunction#getResultType()`,我们定义了我们返回的 `long` 类型在代码生成时可以被解析为 `Types.TIMESTAMP` 类型。
 
 {% highlight scala %}
 object TimestampModifier extends ScalarFunction {
@@ -134,9 +132,9 @@ object TimestampModifier extends ScalarFunction {
 </div>
 
 <div data-lang="python" markdown="1">
-In order to define a Python scalar function, one can extend the base class `ScalarFunction` in `pyflink.table.udf` and implement an evaluation method. The behavior of a Python scalar function is determined by the evaluation method which is named `eval`.
+要定义一个 Python 标量函数,你可以继承 `pyflink.table.udf` 下的 `ScalarFunction`,并且实现一个求值函数。Python 标量函数的行为取决于你实现的求值函数,它的名字必须是 `eval`。
 
-The following example shows how to define your own Python hash code function, register it in the TableEnvironment, and call it in a query. Note that you can configure your scalar function via a constructor before it is registered:
+下面的示例展示了如何自定义一个 Python 的求哈希值的函数,并且把它注册到 `TableEnvironment` 里,然后在查询中使用它。你可以在注册函数之前通过构造函数来配置你的标量函数。
 
 {% highlight python %}
 class HashCode(ScalarFunction):
@@ -148,39 +146,39 @@ class HashCode(ScalarFunction):
 
 table_env = BatchTableEnvironment.create(env)
 
-# register the Python function
+# 注册 Python 函数
 table_env.register_function("hash_code", udf(HashCode(), DataTypes.BIGINT(), DataTypes.BIGINT()))
 
-# use the function in Python Table API
+# 在 Python Table API 中使用函数
 my_table.select("string, bigint, string.hash_code(), hash_code(string)")
 
-# use the function in SQL API
+# 在 SQL API 中使用函数
 table_env.sql_query("SELECT string, bigint, hash_code(bigint) FROM MyTable")
 {% endhighlight %}
 
-There are many ways to define a Python scalar function besides extending the base class `ScalarFunction`.
-Please refer to the [Python Scalar Function]({{ site.baseurl }}/zh/dev/table/python/python_udfs.html#scalar-functions) documentation for more details.
+除了继承 `ScalarFunction`,还有很多方法可以定义 Python 标量函数。
+更多细节,可以参考 [Python 标量函数]({{ site.baseurl }}/zh/dev/table/python/python_udfs.html#scalar-functions) 文档。
 </div>
 </div>
 
 {% top %}
 
-Table Functions
+表值函数
 ---------------
 
-Similar to a user-defined scalar function, a user-defined table function takes zero, one, or multiple scalar values as input parameters. However in contrast to a scalar function, it can return an arbitrary number of rows as output instead of a single value. The returned rows may consist of one or more columns. 
+跟自定义标量函数一样,自定义表值函数的输入参数也可以是 0 到多个标量。但是跟标量函数只能返回一个值不同的是,它可以返回任意多行。返回的每一行可以包含 1 到多列。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-In order to define a table function one has to extend the base class `TableFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a table function is determined by its evaluation methods. An evaluation method must be declared `public` and named `eval`. The `TableFunction` can be overloaded by implementing multiple methods named `eval`. The parameter types of the evaluation methods determine all valid parameters of the table functio [...]
+要定义一个表值函数,你需要扩展 `org.apache.flink.table.functions` 下的 `TableFunction`,并且实现(一个或者多个)求值方法。表值函数的行为取决于你实现的求值方法。求值方法必须被声明为 `public`,并且名字必须是 `eval`。可以通过实现多个名为 `eval` 的方法对求值方法进行重载。求值方法的参数类型决定了表值函数的参数类型。表值函数也可以支持变长参数,比如 `eval(String... strs)`。表值函数返回的表的类型取决于 `TableFunction` 的泛型参数。求值方法通过 `collect(T)` 方法来发送要输出的行。
 
-In the Table API, a table function is used with `.joinLateral` or `.leftOuterJoinLateral`. The `joinLateral` operator (cross) joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator). The `leftOuterJoinLateral` operator joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the o [...]
+在 Table API 中,表值函数是通过 `.joinLateral` 或者 `.leftOuterJoinLateral` 来使用的。`joinLateral` 算子会把外表(算子左侧的表)的每一行跟跟表值函数返回的所有行(位于算子右侧)进行 (cross)join。`leftOuterJoinLateral` 算子也是把外表(算子左侧的表)的每一行跟表值函数返回的所有行(位于算子右侧)进行(cross)join,并且如果表值函数返回 0 行也会保留外表的这一行。在 SQL 里面用 CORSS JOIN 或者 以 ON TRUE 为条件的 LEFT JOIN 来配合 `LATERAL TABLE(<TableFunction>)` 的使用。
 
-The following example shows how to define table-valued function, register it in the TableEnvironment, and call it in a query. Note that you can configure your table function via a constructor before it is registered: 
+下面的例子展示了如何定义一个表值函数,如何在 TableEnvironment 中注册表值函数,以及如何在查询中使用表值函数。你可以在注册之前通过构造函数来配置你的表值函数:
 
 {% highlight java %}
-// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
+// 泛型参数的类型 "Tuple2<String, Integer>" 决定了返回的表的 schema 是(String,Integer)。
 public class Split extends TableFunction<Tuple2<String, Integer>> {
     private String separator = " ";
     
@@ -190,7 +188,7 @@ public class Split extends TableFunction<Tuple2<String, Integer>> {
     
     public void eval(String str) {
         for (String s : str.split(separator)) {
-            // use collect(...) to emit a row
+            // 使用 collect(...) 来输出一行数据
             collect(new Tuple2<String, Integer>(s, s.length()));
         }
     }
@@ -199,36 +197,61 @@ public class Split extends TableFunction<Tuple2<String, Integer>> {
 BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
 Table myTable = ...         // table schema: [a: String]
 
-// Register the function.
+// 注册表值函数。
 tableEnv.registerFunction("split", new Split("#"));
 
-// Use the table function in the Java Table API. "as" specifies the field names of the table.
+// 在 Java Table API 中使用表值函数。"as" 指明了表的字段名字
 myTable.joinLateral("split(a) as (word, length)")
     .select("a, word, length");
 myTable.leftOuterJoinLateral("split(a) as (word, length)")
     .select("a, word, length");
 
-// Use the table function in SQL with LATERAL and TABLE keywords.
-// CROSS JOIN a table function (equivalent to "join" in Table API).
+// 在 SQL 中用 LATERAL 和 TABLE 关键字来使用表值函数
+// CROSS JOIN a table function (等价于 Table API 中的 "join").
 tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
-// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
+// LEFT JOIN a table function (等价于 in Table API 中的 "leftOuterJoin").
 tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
 {% endhighlight %}
+
+需要注意的是 POJO 类型没有确定的字段顺序。所以,你不可以用 `AS` 来重命名返回的 POJO 的字段。
+
+`TableFunction` 的返回类型默认是用 Flink 自动类型推导来决定的。对于基础类型和简单的 POJO 类型推导是没有问题的,但是对于更复杂的、自定义的、以及组合的类型可能会推导错误。如果有这种情况,可以通过重写(override) `TableFunction#getResultType()` 并且返回 `TypeInformation` 来指定返回类型。
+
+下面的例子展示了 `TableFunction` 返回了一个 `Row` 类型,需要显示指定返回类型。我们通过重写 `TableFunction#getResultType` 来指定 `RowTypeInfo(String, Integer)` 作为返回的表的类型。
+
+{% highlight java %}
+public class CustomTypeSplit extends TableFunction<Row> {
+    public void eval(String str) {
+        for (String s : str.split(" ")) {
+            Row row = new Row(2);
+            row.setField(0, s);
+            row.setField(1, s.length());
+            collect(row);
+        }
+    }
+
+    @Override
+    public TypeInformation<Row> getResultType() {
+        return Types.ROW(Types.STRING(), Types.INT());
+    }
+}
+{% endhighlight %}
+
 </div>
 
 <div data-lang="scala" markdown="1">
 
-In order to define a table function one has to extend the base class `TableFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a table function is determined by its evaluation methods. An evaluation method must be declared `public` and named `eval`. The `TableFunction` can be overloaded by implementing multiple methods named `eval`. The parameter types of the evaluation methods determine all valid parameters of the table functio [...]
+要定义一个表值函数,你需要扩展 `org.apache.flink.table.functions` 下的 `TableFunction`,并且实现(一个或者多个)求值方法。表值函数的行为取决于你实现的求值方法。求值方法必须被声明为 `public`,并且名字必须是 `eval`。可以通过实现多个名为 `eval` 的方法对求值方法进行重载。求值方法的参数类型决定了表值函数的参数类型。表值函数也可以支持变长参数,比如 `eval(String... strs)`。表值函数返回的表的类型取决于 `TableFunction` 的泛型参数。求值方法通过 `collect(T)` 方法来发送要输出的行。
 
-In the Table API, a table function is used with `.joinLateral` or `.leftOuterJoinLateral`. The `joinLateral` operator (cross) joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator). The `leftOuterJoinLateral` operator joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the o [...]
+在 Table API 中,表值函数是通过 `.joinLateral` 或者 `.leftOuterJoinLateral` 来使用的。`joinLateral` 算子会把外表(算子左侧的表)的每一行跟跟表值函数返回的所有行(位于算子右侧)进行 (cross)join。`leftOuterJoinLateral` 算子也是把外表(算子左侧的表)的每一行跟表值函数返回的所有行(位于算子右侧)进行(cross)join,并且如果表值函数返回 0 行也会保留外表的这一行。在 SQL 里面用 CORSS JOIN 或者 以 ON TRUE 为条件的 LEFT JOIN 来配合 `LATERAL TABLE(<TableFunction>)` 的使用。
 
-The following example shows how to define table-valued function, register it in the TableEnvironment, and call it in a query. Note that you can configure your table function via a constructor before it is registered: 
+下面的例子展示了如何定义一个表值函数,如何在 TableEnvironment 中注册表值函数,以及如何在查询中使用表值函数。你可以在注册之前通过构造函数来配置你的表值函数:
 
 {% highlight scala %}
-// The generic type "(String, Int)" determines the schema of the returned table as (String, Integer).
+// 泛型参数的类型 "(String, Int)" 决定了返回的表的 schema 是 (String, Integer)。
 class Split(separator: String) extends TableFunction[(String, Int)] {
   def eval(str: String): Unit = {
-    // use collect(...) to emit a row.
+    // 使用 collect(...) 来输出一行
     str.split(separator).foreach(x => collect((x, x.length)))
   }
 }
@@ -236,30 +259,53 @@ class Split(separator: String) extends TableFunction[(String, Int)] {
 val tableEnv = BatchTableEnvironment.create(env)
 val myTable = ...         // table schema: [a: String]
 
-// Use the table function in the Scala Table API (Note: No registration required in Scala Table API).
+// 在 Scala Table API 中使用表值函数(注意:在 Scala Table API 中不需要注册函数)
 val split = new Split("#")
-// "as" specifies the field names of the generated table.
+// "as" 指明了返回表的字段名字
 myTable.joinLateral(split('a) as ('word, 'length)).select('a, 'word, 'length)
 myTable.leftOuterJoinLateral(split('a) as ('word, 'length)).select('a, 'word, 'length)
 
-// Register the table function to use it in SQL queries.
+// 注册表值函数,然后才能在 SQL 查询中使用
 tableEnv.registerFunction("split", new Split("#"))
 
-// Use the table function in SQL with LATERAL and TABLE keywords.
+// 在 SQL 中使用 LATERAL 和 TABLE 关键字类使用表值函数
 // CROSS JOIN a table function (equivalent to "join" in Table API)
 tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)")
 // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
 tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE")
 {% endhighlight %}
-**IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues.
+**重要:**不要把表值函数实现成一个 Scala object。Scala object 是一个单例,会有并发的问题。
+
+需要注意的是 POJO 类型没有确定的字段顺序。所以,你不可以用 `AS` 来重命名返回的 POJO 的字段。
+
+`TableFunction` 的返回类型默认是用 Flink 自动类型推导来决定的。对于基础类型和简单的 POJO 类型推导是没有问题的,但是对于更复杂的、自定义的、以及组合的类型可能会推导错误。如果有这种情况,可以通过重写(override) `TableFunction#getResultType()` 并且返回 `TypeInformation` 来指定返回类型。
+
+下面的例子展示了 `TableFunction` 返回了一个 `Row` 类型,需要显示指定返回类型。我们通过重写 `TableFunction#getResultType` 来返回 `RowTypeInfo` 作为返回类型。
+
+{% highlight scala %}
+class CustomTypeSplit extends TableFunction[Row] {
+  def eval(str: String): Unit = {
+    str.split(" ").foreach({ s =>
+      val row = new Row(2)
+      row.setField(0, s)
+      row.setField(1, s.length)
+      collect(row)
+    })
+  }
+
+  override def getResultType: TypeInformation[Row] = {
+    Types.ROW(Types.STRING, Types.INT)
+  }
+}
+{% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
-In order to define a Python table function, one can extend the base class `TableFunction` in `pyflink.table.udtf` and Implement an evaluation method. The behavior of a Python table function is determined by the evaluation method which is named eval.
+要实现一个 Python 表值函数,你可以扩展 `pyflink.table.udtf` 下的 `TableFunction`,并且实现一个求值方法。Python 表值函数的行为取决于你实现的求值方法,它的名字必须是 `eval`。
 
-In the Python Table API, a Python table function is used with `.join_lateral` or `.left_outer_join_lateral`. The `join_lateral` operator (cross) joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator). The `left_outer_join_lateral` operator joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on t [...]
+在 Python Table API 中,表值函数是通过 `.join_lateral` 或者 `.left_outer_join_lateral` 来使用的。`join_lateral` 算子会把外表(算子左侧的表)的每一行跟跟表值函数返回的所有行(位于算子右侧)进行 (cross)join。`left_outer_join_lateral` 算子也是把外表(算子左侧的表)的每一行跟表值函数返回的所有行(位于算子右侧)进行(cross)join,并且如果表值函数返回 0 行也会保留外表的这一行。在 SQL 里面用 CORSS JOIN 或者 以 ON TRUE 为条件的 LEFT JOIN 来配合 `LATERAL TABLE(<TableFunction>)` 的使用。
 
-The following example shows how to define a Python table function, registered it in the TableEnvironment, and call it in a query. Note that you can configure your table function via a constructor before it is registered:
+下面的例子展示了如何定义一个 Python 表值函数,如何在 TableEnvironment 中注册表值函数,以及如何在查询中使用表值函数。你可以在注册之前通过构造函数来配置你的表值函数:
 
 {% highlight python %}
 class Split(TableFunction):
@@ -271,115 +317,60 @@ env = StreamExecutionEnvironment.get_execution_environment()
 table_env = StreamTableEnvironment.create(env)
 my_table = ...  # type: Table, table schema: [a: String]
 
-# register the Python Table Function
+# 注册 Python 表值函数
 table_env.register_function("split", udtf(Split(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.INT()]))
 
-# use the Python Table Function in Python Table API
+# 在 Python Table API 中使用 Python 表值函数
 my_table.join_lateral("split(a) as (word, length)")
 my_table.left_outer_join_lateral("split(a) as (word, length)")
 
-# use the Python Table function in SQL API
+# 在 SQL API 中使用 Python 表值函数
 table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)")
 table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE")
 
 {% endhighlight %}
 
-There are many ways to define a Python table function besides extending the base class `TableFunction`.
-Please refer to the [Python Table Function]({{ site.baseurl }}/zh/dev/table/python/python_udfs.html#table-functions) documentation for more details.
-
-</div>
-</div>
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-Please note that POJO types do not have a deterministic field order. Therefore, you cannot rename the fields of POJO returned by a table function using `AS`.
-
-By default the result type of a `TableFunction` is determined by Flink’s automatic type extraction facilities. This works well for basic types and simple POJOs but might be wrong for more complex, custom, or composite types. In such a case, the type of the result can be manually specified by overriding `TableFunction#getResultType()` which returns its `TypeInformation`.
-
-The following example shows an example of a `TableFunction` that returns a `Row` type which requires explicit type information. We define that the returned table type should be `RowTypeInfo(String, Integer)` by overriding `TableFunction#getResultType()`.
-
-{% highlight java %}
-public class CustomTypeSplit extends TableFunction<Row> {
-    public void eval(String str) {
-        for (String s : str.split(" ")) {
-            Row row = new Row(2);
-            row.setField(0, s);
-            row.setField(1, s.length());
-            collect(row);
-        }
-    }
-
-    @Override
-    public TypeInformation<Row> getResultType() {
-        return Types.ROW(Types.STRING(), Types.INT());
-    }
-}
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-Please note that POJO types do not have a deterministic field order. Therefore, you cannot rename the fields of POJO returned by a table function using `AS`.
-
-By default the result type of a `TableFunction` is determined by Flink’s automatic type extraction facilities. This works well for basic types and simple POJOs but might be wrong for more complex, custom, or composite types. In such a case, the type of the result can be manually specified by overriding `TableFunction#getResultType()` which returns its `TypeInformation`.
-
-The following example shows an example of a `TableFunction` that returns a `Row` type which requires explicit type information. We define that the returned table type should be `RowTypeInfo(String, Integer)` by overriding `TableFunction#getResultType()`.
-
-{% highlight scala %}
-class CustomTypeSplit extends TableFunction[Row] {
-  def eval(str: String): Unit = {
-    str.split(" ").foreach({ s =>
-      val row = new Row(2)
-      row.setField(0, s)
-      row.setField(1, s.length)
-      collect(row)
-    })
-  }
+除了继承 `TableFunction`,还有很多其它方法可以定义 Python 表值函数。
+更多信息,参考 [Python 表值函数]({{ site.baseurl }}/zh/dev/table/python/python_udfs.html#table-functions)文档。
 
-  override def getResultType: TypeInformation[Row] = {
-    Types.ROW(Types.STRING, Types.INT)
-  }
-}
-{% endhighlight %}
 </div>
 </div>
 
 {% top %}
 
 
-Aggregation Functions
+聚合函数
 ---------------------
 
-User-Defined Aggregate Functions (UDAGGs) aggregate a table (one or more rows with one or more attributes) to a scalar value. 
+自定义聚合函数(UDAGG)是把一个表(一行或者多行,每行可以有一列或者多列)聚合成一个标量值。
 
 <center>
 <img alt="UDAGG mechanism" src="{{ site.baseurl }}/fig/udagg-mechanism.png" width="80%">
 </center>
 
-The above figure shows an example of an aggregation. Assume you have a table that contains data about beverages. The table consists of three columns, `id`, `name` and `price` and 5 rows. Imagine you need to find the highest price of all beverages in the table, i.e., perform a `max()` aggregation. You would need to check each of the 5 rows and the result would be a single numeric value.
+上面的图片展示了一个聚合的例子。假设你有一个关于饮料的表。表里面有三个字段,分别是 `id`、`name`、`price`,表里有 5 行数据。假设你需要找到所有饮料里最贵的饮料的价格,即执行一个 `max()` 聚合。你需要遍历所有 5 行数据,而结果就只有一个数值。
 
-User-defined aggregation functions are implemented by extending the `AggregateFunction` class. An `AggregateFunction` works as follows. First, it needs an `accumulator`, which is the data structure that holds the intermediate result of the aggregation. An empty accumulator is created by calling the `createAccumulator()` method of the `AggregateFunction`. Subsequently, the `accumulate()` method of the function is called for each input row to update the accumulator. Once all rows have been [...]
+自定义聚合函数是通过扩展 `AggregateFunction` 来实现的。`AggregateFunction` 的工作过程如下。首先,它需要一个 `accumulator`,它是一个数据结构,存储了聚合的中间结果。通过调用 `AggregateFunction` 的 `createAccumulator()` 方法创建一个空的 accumulator。接下来,对于每一行数据,会调用 `accumulate()` 方法来更新 accumulator。当所有的数据都处理完了之后,通过调用 `getValue` 方法来计算和返回最终的结果。
 
-**The following methods are mandatory for each `AggregateFunction`:**
+**下面几个方法是每个 `AggregateFunction` 必须要实现的:**
 
 - `createAccumulator()`
 - `accumulate()` 
 - `getValue()`
 
-Flink’s type extraction facilities can fail to identify complex data types, e.g., if they are not basic types or simple POJOs. So similar to `ScalarFunction` and `TableFunction`, `AggregateFunction` provides methods to specify the `TypeInformation` of the result type (through 
- `AggregateFunction#getResultType()`) and the type of the accumulator (through `AggregateFunction#getAccumulatorType()`).
+Flink 的类型推导在遇到复杂类型的时候可能会推导出错误的结果,比如那些非基本类型和普通的 POJO 类型的复杂类型。所以跟 `ScalarFunction` 和 `TableFunction` 一样,`AggregateFunction` 也提供了 `AggregateFunction#getResultType()` 和 `AggregateFunction#getAccumulatorType()` 来分别指定返回值类型和 accumulator 的类型,两个函数的返回值类型也都是 `TypeInformation`。
  
-Besides the above methods, there are a few contracted methods that can be 
-optionally implemented. While some of these methods allow the system more efficient query execution, others are mandatory for certain use cases. For instance, the `merge()` method is mandatory if the aggregation function should be applied in the context of a session group window (the accumulators of two session windows need to be joined when a row is observed that "connects" them). 
+除了上面的方法,还有几个方法可以选择实现。这些方法有些可以让查询更加高效,而有些是在某些特定场景下必须要实现的。例如,如果聚合函数用在会话窗口(当两个会话窗口合并的时候需要 merge 他们的 accumulator)的话,`merge()` 方法就是必须要实现的。
 
-**The following methods of `AggregateFunction` are required depending on the use case:**
+**`AggregateFunction` 的以下方法在某些场景下是必须实现的:**
 
-- `retract()` is required for aggregations on bounded `OVER` windows.
-- `merge()` is required for many batch aggregations and session window aggregations.
-- `resetAccumulator()` is required for many batch aggregations.
+- `retract()` 在 bounded `OVER` 窗口中是必须实现的。
+- `merge()` 在许多批式聚合和会话窗口聚合中是必须实现的。
+- `resetAccumulator()` 在许多批式聚合中是必须实现的。
 
-All methods of `AggregateFunction` must be declared as `public`, not `static` and named exactly as the names mentioned above. The methods `createAccumulator`, `getValue`, `getResultType`, and `getAccumulatorType` are defined in the `AggregateFunction` abstract class, while others are contracted methods. In order to define a aggregate function, one has to extend the base class `org.apache.flink.table.functions.AggregateFunction` and implement one (or more) `accumulate` methods. The method [...]
+`AggregateFunction` 的所有方法都必须是 `public` 的,不能是 `static` 的,而且名字必须跟上面写的一样。`createAccumulator`、`getValue`、`getResultType` 以及 `getAccumulatorType` 这几个函数是在抽象类 `AggregateFunction` 中定义的,而其他函数都是约定的方法。如果要定义一个聚合函数,你需要扩展 `org.apache.flink.table.functions.AggregateFunction`,并且实现一个(或者多个)`accumulate` 方法。`accumulate` 方法可以重载,每个方法的参数类型不同,并且支持变长参数。
 
-Detailed documentation for all methods of `AggregateFunction` is given below. 
+`AggregateFunction` 的所有方法的详细文档如下。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -603,15 +594,15 @@ abstract class AggregateFunction[T, ACC] extends UserDefinedAggregateFunction[T,
 </div>
 
 
-The following example shows how to
+下面的例子展示了如何:
 
-- define an `AggregateFunction` that calculates the weighted average on a given column, 
-- register the function in the `TableEnvironment`, and 
-- use the function in a query.  
+- 定义一个聚合函数来计算某一列的加权平均,
+- 在 `TableEnvironment` 中注册函数,
+- 在查询中使用函数。
 
-To calculate an weighted average value, the accumulator needs to store the weighted sum and count of all the data that has been accumulated. In our example we define a class `WeightedAvgAccum` to be the accumulator. Accumulators are automatically backup-ed by Flink's checkpointing mechanism and restored in case of a failure to ensure exactly-once semantics.
+为了计算加权平均值,accumulator 需要存储加权总和以及数据的条数。在我们的例子里,我们定义了一个类 `WeightedAvgAccum` 来作为 accumulator。Flink 的 checkpoint 机制会自动保存 accumulator,在失败时进行恢复,以此来保证精确一次的语义。
 
-The `accumulate()` method of our `WeightedAvg` `AggregateFunction` has three inputs. The first one is the `WeightedAvgAccum` accumulator, the other two are user-defined inputs: input value `ivalue` and weight of the input `iweight`. Although the `retract()`, `merge()`, and `resetAccumulator()` methods are not mandatory for most aggregation types, we provide them below as examples. Please note that we used Java primitive types and defined `getResultType()` and `getAccumulatorType()` metho [...]
+我们的 `WeightedAvg`(聚合函数)的 `accumulate` 方法有三个输入参数。第一个是 `WeightedAvgAccum` accumulator,另外两个是用户自定义的输入:输入的值 `ivalue` 和 输入的权重 `iweight`。尽管 `retract()`、`merge()`、`resetAccumulator()` 这几个方法在大多数聚合类型中都不是必须实现的,我们也在样例中提供了他们的实现。请注意我们在 Scala 样例中也是用的是 Java 的基础类型,并且定义了 `getResultType()` 和 `getAccumulatorType()`,因为 Flink 的类型推导对于 Scala 的类型推导做的不是很好。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -668,11 +659,11 @@ public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum
     }
 }
 
-// register function
+// 注册函数
 StreamTableEnvironment tEnv = ...
 tEnv.registerFunction("wAvg", new WeightedAvg());
 
-// use function
+// 使用函数
 tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");
 
 {% endhighlight %}
@@ -742,11 +733,11 @@ class WeightedAvg extends AggregateFunction[JLong, CountAccumulator] {
   override def getResultType: TypeInformation[JLong] = Types.LONG
 }
 
-// register function
+// 注册函数
 val tEnv: StreamTableEnvironment = ???
 tEnv.registerFunction("wAvg", new WeightedAvg())
 
-// use function
+// 使用函数
 tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
 
 {% endhighlight %}
@@ -766,6 +757,7 @@ public static class WeightedAvgAccum {
 }
 
 // The java class must have a public no-argument constructor and can be founded in current java classloader.
+// Java 类必须有一个 public 的无参构造函数,并且可以在当前类加载器中加载到。
 
 /**
  * Weighted Average user-defined aggregate function.
@@ -812,11 +804,11 @@ public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum
 }
 '''
 
-# register function
+# 注册函数
 t_env = ...  # type: StreamTableEnvironment
 t_env.register_java_function("wAvg", "my.java.function.WeightedAvg")
 
-# use function
+# 使用函数
 t_env.sql_query("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
 
 {% endhighlight %}
@@ -826,46 +818,44 @@ t_env.sql_query("SELECT user, wAvg(points, level) AS avgPoints FROM userScores G
 
 {% top %}
 
-Table Aggregation Functions
+表值聚合函数
 ---------------------
 
-User-Defined Table Aggregate Functions (UDTAGGs) aggregate a table (one or more rows with one or more attributes) to a result table with multi rows and columns. 
+自定义表值聚合函数(UDTAGG)可以把一个表(一行或者多行,每行有一列或者多列)聚合成另一张表,结果中可以有多行多列。
 
 <center>
 <img alt="UDAGG mechanism" src="{{ site.baseurl }}/fig/udtagg-mechanism.png" width="80%">
 </center>
 
-The above figure shows an example of a table aggregation. Assume you have a table that contains data about beverages. The table consists of three columns, `id`, `name` and `price` and 5 rows. Imagine you need to find the top 2 highest prices of all beverages in the table, i.e., perform a `top2()` table aggregation. You would need to check each of the 5 rows and the result would be a table with the top 2 values.
+上图展示了一个表值聚合函数的例子。假设你有一个饮料的表,这个表有 3 列,分别是 `id`、`name` 和 `price`,一共有 5 行。假设你需要找到价格最高的两个饮料,类似于 `top2()` 表值聚合函数。你需要遍历所有 5 行数据,结果是有 2 行数据的一个表。
 
-User-defined table aggregation functions are implemented by extending the `TableAggregateFunction` class. A `TableAggregateFunction` works as follows. First, it needs an `accumulator`, which is the data structure that holds the intermediate result of the aggregation. An empty accumulator is created by calling the `createAccumulator()` method of the `TableAggregateFunction`. Subsequently, the `accumulate()` method of the function is called for each input row to update the accumulator. Onc [...]
+用户自定义表值聚合函数是通过扩展 `TableAggregateFunction` 类来实现的。一个 `TableAggregateFunction` 的工作过程如下。首先,它需要一个 `accumulator`,这个 `accumulator` 负责存储聚合的中间结果。 通过调用 `TableAggregateFunction` 的 `createAccumulator` 方法来构造一个空的 accumulator。接下来,对于每一行数据,会调用 `accumulate` 方法来更新 accumulator。当所有数据都处理完之后,调用 `emitValue` 方法来计算和返回最终的结果。
 
-**The following methods are mandatory for each `TableAggregateFunction`:**
+**下面几个 `TableAggregateFunction` 的方法是必须要实现的:**
 
 - `createAccumulator()`
 - `accumulate()` 
 
-Flink’s type extraction facilities can fail to identify complex data types, e.g., if they are not basic types or simple POJOs. So similar to `ScalarFunction` and `TableFunction`, `TableAggregateFunction` provides methods to specify the `TypeInformation` of the result type (through 
- `TableAggregateFunction#getResultType()`) and the type of the accumulator (through `TableAggregateFunction#getAccumulatorType()`).
+Flink 的类型推导在遇到复杂类型的时候可能会推导出错误的结果,比如那些非基本类型和普通的 POJO 类型的复杂类型。所以类似于 `ScalarFunction` 和 `TableFunction`,`TableAggregateFunction` 也提供了 `TableAggregateFunction#getResultType()` 和 `TableAggregateFunction#getAccumulatorType()` 方法来指定返回值类型和 accumulator 的类型,这两个方法都需要返回 `TypeInformation`。
  
-Besides the above methods, there are a few contracted methods that can be 
-optionally implemented. While some of these methods allow the system more efficient query execution, others are mandatory for certain use cases. For instance, the `merge()` method is mandatory if the aggregation function should be applied in the context of a session group window (the accumulators of two session windows need to be joined when a row is observed that "connects" them). 
+除了上面的方法,还有几个其他的方法可以选择性的实现。有些方法可以让查询更加高效,而有些方法对于某些特定场景是必须要实现的。比如,在会话窗口(当两个会话窗口合并时会合并两个 accumulator)中使用聚合函数时,必须要实现`merge()` 方法。
 
-**The following methods of `TableAggregateFunction` are required depending on the use case:**
+**下面几个 `TableAggregateFunction` 的方法在某些特定场景下是必须要实现的:**
 
-- `retract()` is required for aggregations on bounded `OVER` windows.
-- `merge()` is required for many batch aggregations and session window aggregations.
-- `resetAccumulator()` is required for many batch aggregations.
-- `emitValue()` is required for batch and window aggregations.
+- `retract()` 在 bounded `OVER` 窗口中的聚合函数必须要实现。
+- `merge()` 在许多批式聚合和会话窗口聚合中是必须要实现的。
+- `resetAccumulator()` 在许多批式聚合中是必须要实现的。
+- `emitValue()` 在批式聚合以及窗口聚合中是必须要实现的。
 
-**The following methods of `TableAggregateFunction` are used to improve the performance of streaming jobs:**
+**下面的 `TableAggregateFunction` 的方法可以提升流式任务的效率:**
 
-- `emitUpdateWithRetract()` is used to emit values that have been updated under retract mode.
+- `emitUpdateWithRetract()` 在 retract 模式下,该方法负责发送被更新的值。
 
-For `emitValue` method, it emits full data according to the accumulator. Take TopN as an example, `emitValue` emit all top n values each time. This may bring performance problems for streaming jobs. To improve the performance, a user can also implement `emitUpdateWithRetract` method to improve the performance. The method outputs data incrementally in retract mode, i.e., once there is an update, we have to retract old records before sending new updated ones. The method will be used in pre [...]
+`emitValue` 方法会发送所有 accumulator 给出的结果。拿 TopN 来说,`emitValue` 每次都会发送所有的最大的 n 个值。这在流式任务中可能会有一些性能问题。为了提升性能,用户可以实现 `emitUpdateWithRetract` 方法。这个方法在 retract 模式下会增量的输出结果,比如有数据更新了,我们必须要撤回老的数据,然后再发送新的数据。如果定义了 `emitUpdateWithRetract` 方法,那它会优先于 `emitValue` 方法被使用,因为一般认为 `emitUpdateWithRetract` 会更加高效,因为它的输出是增量的。
 
-All methods of `TableAggregateFunction` must be declared as `public`, not `static` and named exactly as the names mentioned above. The methods `createAccumulator`, `getResultType`, and `getAccumulatorType` are defined in the parent abstract class of `TableAggregateFunction`, while others are contracted methods. In order to define a table aggregate function, one has to extend the base class `org.apache.flink.table.functions.TableAggregateFunction` and implement one (or more) `accumulate`  [...]
+`TableAggregateFunction` 的所有方法都必须是 `public` 的、非 `static` 的,而且名字必须跟上面提到的一样。`createAccumulator`、`getResultType` 和 `getAccumulatorType` 这三个方法是在抽象父类 `TableAggregateFunction` 中定义的,而其他的方法都是约定的方法。要实现一个表值聚合函数,你必须扩展 `org.apache.flink.table.functions.TableAggregateFunction`,并且实现一个(或者多个)`accumulate` 方法。`accumulate` 方法可以有多个重载的方法,也可以支持变长参数。
 
-Detailed documentation for all methods of `TableAggregateFunction` is given below. 
+`TableAggregateFunction` 的所有方法的详细文档如下。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1126,15 +1116,15 @@ abstract class TableAggregateFunction[T, ACC] extends UserDefinedAggregateFuncti
 </div>
 
 
-The following example shows how to
+下面的例子展示了如何
 
-- define a `TableAggregateFunction` that calculates the top 2 values on a given column, 
-- register the function in the `TableEnvironment`, and 
-- use the function in a Table API query(TableAggregateFunction is only supported by Table API).  
+- 定义一个 `TableAggregateFunction` 来计算给定列的最大的 2 个值,
+- 在 `TableEnvironment` 中注册函数,
+- 在 Table API 查询中使用函数(当前只在 Table API 中支持 TableAggregateFunction)。
 
-To calculate the top 2 values, the accumulator needs to store the biggest 2 values of all the data that has been accumulated. In our example we define a class `Top2Accum` to be the accumulator. Accumulators are automatically backup-ed by Flink's checkpointing mechanism and restored in case of a failure to ensure exactly-once semantics.
+为了计算最大的 2 个值,accumulator 需要保存当前看到的最大的 2 个值。在我们的例子中,我们定义了类 `Top2Accum` 来作为 accumulator。Flink 的 checkpoint 机制会自动保存 accumulator,并且在失败时进行恢复,来保证精确一次的语义。
 
-The `accumulate()` method of our `Top2` `TableAggregateFunction` has two inputs. The first one is the `Top2Accum` accumulator, the other one is the user-defined input: input value `v`. Although the `merge()` method is not mandatory for most table aggregation types, we provide it below as examples. Please note that we used Java primitive types and defined `getResultType()` and `getAccumulatorType()` methods in the Scala example because Flink type extraction does not work very well for Sca [...]
+我们的 `Top2` 表值聚合函数(`TableAggregateFunction`)的 `accumulate()` 方法有两个输入,第一个是 `Top2Accum` accumulator,另一个是用户定义的输入:输入的值 `v`。尽管 `merge()` 方法在大多数聚合类型中不是必须的,我们也在样例中提供了它的实现。请注意,我们在 Scala 样例中也使用的是 Java 的基础类型,并且定义了 `getResultType()` 和 `getAccumulatorType()` 方法,因为 Flink 的类型推导对于 Scala 的类型推导支持的不是很好。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1188,14 +1178,14 @@ public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>
     }
 }
 
-// register function
+// 注册函数
 StreamTableEnvironment tEnv = ...
 tEnv.registerFunction("top2", new Top2());
 
-// init table
+// 初始化表
 Table tab = ...;
 
-// use function
+// 使用函数
 tab.groupBy("key")
     .flatAggregate("top2(a) as (v, rank)")
     .select("key, v, rank");
@@ -1258,10 +1248,10 @@ class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum
   }
 }
 
-// init table
+// 初始化表
 val tab = ...
 
-// use function
+// 使用函数
 tab
   .groupBy('key)
   .flatAggregate(top2('a) as ('v, 'rank))
@@ -1272,7 +1262,7 @@ tab
 </div>
 
 
-The following example shows how to use `emitUpdateWithRetract` method to emit only updates. To emit only updates, in our example, the accumulator keeps both old and new top 2 values. Note: if the N of topN is big, it may inefficient to keep both old and new values. One way to solve this case is to store the input record into the accumulator in `accumulate` method and then perform calculation in `emitUpdateWithRetract`.
+下面的例子展示了如何使用 `emitUpdateWithRetract` 方法来只发送更新的数据。为了只发送更新的结果,accumulator 保存了上一次的最大的2个值,也保存了当前最大的2个值。注意:如果 TopN 中的 n 非常大,这种既保存上次的结果,也保存当前的结果的方式不太高效。一种解决这种问题的方式是把输入数据直接存储到 `accumulator` 中,然后在调用 `emitUpdateWithRetract` 方法时再进行计算。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1332,14 +1322,14 @@ public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>
     }
 }
 
-// register function
+// 注册函数
 StreamTableEnvironment tEnv = ...
 tEnv.registerFunction("top2", new Top2());
 
-// init table
+// 初始化表
 Table tab = ...;
 
-// use function
+// 使用函数
 tab.groupBy("key")
     .flatAggregate("top2(a) as (v, rank)")
     .select("key, v, rank");
@@ -1409,10 +1399,10 @@ class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum
   }
 }
 
-// init table
+// 初始化表
 val tab = ...
 
-// use function
+// 使用函数
 tab
   .groupBy('key)
   .flatAggregate(top2('a) as ('v, 'rank))
@@ -1425,33 +1415,33 @@ tab
 
 {% top %}
 
-Best Practices for Implementing UDFs
+实现自定义函数的最佳实践
 ------------------------------------
 
-The Table API and SQL code generation internally tries to work with primitive values as much as possible. A user-defined function can introduce much overhead through object creation, casting, and (un)boxing. Therefore, it is highly recommended to declare parameters and result types as primitive types instead of their boxed classes. `Types.DATE` and `Types.TIME` can also be represented as `int`. `Types.TIMESTAMP` can be represented as `long`. 
+在 Table API 和 SQL 的内部,代码生成会尽量的使用基础类型。自定义函数的参数及返回值类型是对象,会有很多的对象创建、转换(cast)、以及自动拆装箱的开销。因此,强烈建议使用基础类型来作为参数以及返回值的类型。`Types.DATE` 和 `Types.TIME` 可以用 `int` 来表示。`Types.TIMESTAMP` 可以用 `long` 来表示。
 
-We recommended that user-defined functions should be written by Java instead of Scala as Scala types pose a challenge for Flink's type extractor.
+我们建议自定义函数用 Java 来实现,而不是用 Scala 来实现,因为 Flink 的类型推导对 Scala 不是很友好。
 
 {% top %}
 
-Integrating UDFs with the Runtime
+自定义函数跟运行时集成
 ---------------------------------
 
-Sometimes it might be necessary for a user-defined function to get global runtime information or do some setup/clean-up work before the actual work. User-defined functions provide `open()` and `close()` methods that can be overridden and provide similar functionality as the methods in `RichFunction` of DataSet or DataStream API.
+有时候自定义函数需要获取一些全局信息,或者在真正被调用之前做一些配置(setup)/清理(clean-up)的工作。自定义函数也提供了 `open()` 和 `close()` 方法,你可以重写这两个方法做到类似于 DataSet 或者 DataStream API 中 `RichFunction` 的功能。
 
-The `open()` method is called once before the evaluation method. The `close()` method after the last call to the evaluation method.
+`open()` 方法在求值方法被调用之前先调用。`close()` 方法在求值方法调用完之后被调用。
 
-The `open()` method provides a `FunctionContext` that contains information about the context in which user-defined functions are executed, such as the metric group, the distributed cache files, or the global job parameters.
+`open()` 方法提供了一个 `FunctionContext`,它包含了一些自定义函数被执行时的上下文信息,比如 metric group、分布式文件缓存,或者是全局的作业参数等。
 
-The following information can be obtained by calling the corresponding methods of `FunctionContext`:
+下面的信息可以通过调用 `FunctionContext` 的对应的方法来获得:
 
-| Method                                | Description                                            |
+| 方法                                  | 描述                                                    |
 | :------------------------------------ | :----------------------------------------------------- |
-| `getMetricGroup()`                    | Metric group for this parallel subtask.                |
-| `getCachedFile(name)`                 | Local temporary file copy of a distributed cache file. |
-| `getJobParameter(name, defaultValue)` | Global job parameter value associated with given key.  |
+| `getMetricGroup()`                    | 执行该函数的 subtask 的 Metric Group。                   |
+| `getCachedFile(name)`                 | 分布式文件缓存的本地临时文件副本。                         |
+| `getJobParameter(name, defaultValue)` | 跟对应的 key 关联的全局参数值。                           |
 
-The following example snippet shows how to use `FunctionContext` in a scalar function for accessing a global job parameter:
+下面的例子展示了如何在一个标量函数中通过 `FunctionContext` 来获取一个全局的任务参数:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1462,8 +1452,8 @@ public class HashCode extends ScalarFunction {
 
     @Override
     public void open(FunctionContext context) throws Exception {
-        // access "hashcode_factor" parameter
-        // "12" would be the default value if parameter does not exist
+        // 获取参数 "hashcode_factor"
+        // 如果不存在,则使用默认值 "12"
         factor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12")); 
     }
 
@@ -1475,18 +1465,18 @@ public class HashCode extends ScalarFunction {
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
 
-// set job parameter
+// 设置任务参数
 Configuration conf = new Configuration();
 conf.setString("hashcode_factor", "31");
 env.getConfig().setGlobalJobParameters(conf);
 
-// register the function
+// 注册函数
 tableEnv.registerFunction("hashCode", new HashCode());
 
-// use the function in Java Table API
+// 在 Java Table API 中使用函数
 myTable.select("string, string.hashCode(), hashCode(string)");
 
-// use the function in SQL
+// 在 SQL 中使用函数
 tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
 {% endhighlight %}
 </div>
@@ -1498,8 +1488,8 @@ object hashCode extends ScalarFunction {
   var hashcode_factor = 12
 
   override def open(context: FunctionContext): Unit = {
-    // access "hashcode_factor" parameter
-    // "12" would be the default value if parameter does not exist
+    // 获取参数 "hashcode_factor"
+    // 如果不存在,则使用默认值 "12"
     hashcode_factor = context.getJobParameter("hashcode_factor", "12").toInt
   }
 
@@ -1510,10 +1500,10 @@ object hashCode extends ScalarFunction {
 
 val tableEnv = BatchTableEnvironment.create(env)
 
-// use the function in Scala Table API
+// 在 Scala Table API 中使用函数
 myTable.select('string, hashCode('string))
 
-// register and use the function in SQL
+// 在 SQL 中注册和使用函数
 tableEnv.registerFunction("hashCode", hashCode)
 tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable")
 {% endhighlight %}