You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/07/10 13:43:45 UTC

[flink] branch master updated: [FLINK-18526][python][docs] Add documentation for Python UDF on how to use managed memory

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 32f3eeb  [FLINK-18526][python][docs] Add documentation for Python UDF on how to use managed memory
32f3eeb is described below

commit 32f3eebea8840356aa6e488030ced75a1cbed123
Author: acqua.csq <ac...@alibaba-inc.com>
AuthorDate: Wed Jul 8 20:28:49 2020 +0800

    [FLINK-18526][python][docs] Add documentation for Python UDF on how to use managed memory
    
    This closes #12855.
---
 docs/dev/table/python/python_udfs.md               | 27 ++++++++++++++++++++++
 docs/dev/table/python/python_udfs.zh.md            | 27 ++++++++++++++++++++++
 docs/dev/table/python/vectorized_python_udfs.md    |  7 ++++++
 docs/dev/table/python/vectorized_python_udfs.zh.md |  7 ++++++
 4 files changed, 68 insertions(+)

diff --git a/docs/dev/table/python/python_udfs.md b/docs/dev/table/python/python_udfs.md
index ea01cd0..98ce5f0 100644
--- a/docs/dev/table/python/python_udfs.md
+++ b/docs/dev/table/python/python_udfs.md
@@ -48,6 +48,9 @@ class HashCode(ScalarFunction):
 
 table_env = BatchTableEnvironment.create(env)
 
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
 # register the Python function
 table_env.register_function("hash_code", udf(HashCode(), DataTypes.BIGINT(), DataTypes.BIGINT()))
 
@@ -58,6 +61,10 @@ my_table.select("string, bigint, bigint.hash_code(), hash_code(bigint)")
 table_env.sql_query("SELECT string, bigint, hash_code(bigint) FROM MyTable")
 {% endhighlight %}
 
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.
+
 It also supports to use Java/Scala scalar functions in Python Table API programs.
 
 {% highlight python %}
@@ -76,6 +83,9 @@ public class HashCode extends ScalarFunction {
 
 table_env = BatchTableEnvironment.create(env)
 
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
 # register the Java function
 table_env.register_java_function("hash_code", "my.java.function.HashCode")
 
@@ -86,6 +96,10 @@ my_table.select("string.hash_code(), hash_code(string)")
 table_env.sql_query("SELECT string, bigint, hash_code(string) FROM MyTable")
 {% endhighlight %}
 
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.
+
 There are many ways to define a Python scalar function besides extending the base class `ScalarFunction`.
 The following examples show the different ways to define a Python scalar function which takes two columns of
 bigint as the input parameters and returns the sum of them as the result.
@@ -145,6 +159,9 @@ env = StreamExecutionEnvironment.get_execution_environment()
 table_env = StreamTableEnvironment.create(env)
 my_table = ...  # type: Table, table schema: [a: String]
 
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
 # register the Python Table Function
 table_env.register_function("split", udtf(Split(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.INT()]))
 
@@ -158,6 +175,9 @@ table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE
 
 {% endhighlight %}
 
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.
 
 It also supports to use Java/Scala table functions in Python Table API programs.
 {% highlight python %}
@@ -182,6 +202,9 @@ env = StreamExecutionEnvironment.get_execution_environment()
 table_env = StreamTableEnvironment.create(env)
 my_table = ...  # type: Table, table schema: [a: String]
 
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
 # Register the java function.
 table_env.register_java_function("split", "my.java.function.Split")
 
@@ -198,6 +221,10 @@ table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)
 table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE")
 {% endhighlight %}
 
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.
+
 Like Python scalar functions, you can use the above five ways to define Python TableFunctions.
 
 <span class="label label-info">Note</span> The only difference is that the return type of Python Table Functions needs to be an iterable, iterator or generator.
diff --git a/docs/dev/table/python/python_udfs.zh.md b/docs/dev/table/python/python_udfs.zh.md
index b19120e..1d83da5 100644
--- a/docs/dev/table/python/python_udfs.zh.md
+++ b/docs/dev/table/python/python_udfs.zh.md
@@ -48,6 +48,9 @@ class HashCode(ScalarFunction):
 
 table_env = BatchTableEnvironment.create(env)
 
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
 # register the Python function
 table_env.register_function("hash_code", udf(HashCode(), DataTypes.BIGINT(), DataTypes.BIGINT()))
 
@@ -58,6 +61,10 @@ my_table.select("string, bigint, bigint.hash_code(), hash_code(bigint)")
 table_env.sql_query("SELECT string, bigint, hash_code(bigint) FROM MyTable")
 {% endhighlight %}
 
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.
+
 It also supports to use Java/Scala scalar functions in Python Table API programs.
 
 {% highlight python %}
@@ -76,6 +83,9 @@ public class HashCode extends ScalarFunction {
 
 table_env = BatchTableEnvironment.create(env)
 
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
 # register the Java function
 table_env.register_java_function("hash_code", "my.java.function.HashCode")
 
@@ -86,6 +96,10 @@ my_table.select("string.hash_code(), hash_code(string)")
 table_env.sql_query("SELECT string, bigint, hash_code(string) FROM MyTable")
 {% endhighlight %}
 
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.
+
 There are many ways to define a Python scalar function besides extending the base class `ScalarFunction`.
 The following examples show the different ways to define a Python scalar function which takes two columns of
 bigint as the input parameters and returns the sum of them as the result.
@@ -145,6 +159,9 @@ env = StreamExecutionEnvironment.get_execution_environment()
 table_env = StreamTableEnvironment.create(env)
 my_table = ...  # type: Table, table schema: [a: String]
 
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
 # register the Python Table Function
 table_env.register_function("split", udtf(Split(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.INT()]))
 
@@ -158,6 +175,9 @@ table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE
 
 {% endhighlight %}
 
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.
 
 It also supports to use Java/Scala table functions in Python Table API programs.
 {% highlight python %}
@@ -182,6 +202,9 @@ env = StreamExecutionEnvironment.get_execution_environment()
 table_env = StreamTableEnvironment.create(env)
 my_table = ...  # type: Table, table schema: [a: String]
 
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
 # Register the java function.
 table_env.register_java_function("split", "my.java.function.Split")
 
@@ -198,6 +221,10 @@ table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)
 table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE")
 {% endhighlight %}
 
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.
+
 Like Python scalar functions, you can use the above five ways to define Python TableFunctions.
 
 <span class="label label-info">Note</span> The only difference is that the return type of Python Table Functions needs to be an iterable, iterator or generator.
diff --git a/docs/dev/table/python/vectorized_python_udfs.md b/docs/dev/table/python/vectorized_python_udfs.md
index 8eec40b..ee5f03f 100644
--- a/docs/dev/table/python/vectorized_python_udfs.md
+++ b/docs/dev/table/python/vectorized_python_udfs.md
@@ -54,6 +54,9 @@ def add(i, j):
 
 table_env = BatchTableEnvironment.create(env)
 
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
 # register the vectorized Python scalar function
 table_env.register_function("add", add)
 
@@ -63,3 +66,7 @@ my_table.select("add(bigint, bigint)")
 # use the vectorized Python scalar function in SQL API
 table_env.sql_query("SELECT add(bigint, bigint) FROM MyTable")
 {% endhighlight %}
+
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.
diff --git a/docs/dev/table/python/vectorized_python_udfs.zh.md b/docs/dev/table/python/vectorized_python_udfs.zh.md
index c50bc5b..b2d2ed9 100644
--- a/docs/dev/table/python/vectorized_python_udfs.zh.md
+++ b/docs/dev/table/python/vectorized_python_udfs.zh.md
@@ -54,6 +54,9 @@ def add(i, j):
 
 table_env = BatchTableEnvironment.create(env)
 
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
 # register the vectorized Python scalar function
 table_env.register_function("add", add)
 
@@ -63,3 +66,7 @@ my_table.select("add(bigint, bigint)")
 # use the vectorized Python scalar function in SQL API
 table_env.sql_query("SELECT add(bigint, bigint) FROM MyTable")
 {% endhighlight %}
+
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.