You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/07/10 13:43:45 UTC
[flink] branch master updated: [FLINK-18526][python][docs] Add
documentation for Python UDF on how to use managed memory
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 32f3eeb [FLINK-18526][python][docs] Add documentation for Python UDF on how to use managed memory
32f3eeb is described below
commit 32f3eebea8840356aa6e488030ced75a1cbed123
Author: acqua.csq <ac...@alibaba-inc.com>
AuthorDate: Wed Jul 8 20:28:49 2020 +0800
[FLINK-18526][python][docs] Add documentation for Python UDF on how to use managed memory
This closes #12855.
---
docs/dev/table/python/python_udfs.md | 27 ++++++++++++++++++++++
docs/dev/table/python/python_udfs.zh.md | 27 ++++++++++++++++++++++
docs/dev/table/python/vectorized_python_udfs.md | 7 ++++++
docs/dev/table/python/vectorized_python_udfs.zh.md | 7 ++++++
4 files changed, 68 insertions(+)
diff --git a/docs/dev/table/python/python_udfs.md b/docs/dev/table/python/python_udfs.md
index ea01cd0..98ce5f0 100644
--- a/docs/dev/table/python/python_udfs.md
+++ b/docs/dev/table/python/python_udfs.md
@@ -48,6 +48,9 @@ class HashCode(ScalarFunction):
table_env = BatchTableEnvironment.create(env)
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
# register the Python function
table_env.register_function("hash_code", udf(HashCode(), DataTypes.BIGINT(), DataTypes.BIGINT()))
@@ -58,6 +61,10 @@ my_table.select("string, bigint, bigint.hash_code(), hash_code(bigint)")
table_env.sql_query("SELECT string, bigint, hash_code(bigint) FROM MyTable")
{% endhighlight %}
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.
+
It also supports to use Java/Scala scalar functions in Python Table API programs.
{% highlight python %}
@@ -76,6 +83,9 @@ public class HashCode extends ScalarFunction {
table_env = BatchTableEnvironment.create(env)
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
# register the Java function
table_env.register_java_function("hash_code", "my.java.function.HashCode")
@@ -86,6 +96,10 @@ my_table.select("string.hash_code(), hash_code(string)")
table_env.sql_query("SELECT string, bigint, hash_code(string) FROM MyTable")
{% endhighlight %}
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.
+
There are many ways to define a Python scalar function besides extending the base class `ScalarFunction`.
The following examples show the different ways to define a Python scalar function which takes two columns of
bigint as the input parameters and returns the sum of them as the result.
@@ -145,6 +159,9 @@ env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
my_table = ... # type: Table, table schema: [a: String]
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
# register the Python Table Function
table_env.register_function("split", udtf(Split(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.INT()]))
@@ -158,6 +175,9 @@ table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE
{% endhighlight %}
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.
It also supports to use Java/Scala table functions in Python Table API programs.
{% highlight python %}
@@ -182,6 +202,9 @@ env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
my_table = ... # type: Table, table schema: [a: String]
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
# Register the java function.
table_env.register_java_function("split", "my.java.function.Split")
@@ -198,6 +221,10 @@ table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)
table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE")
{% endhighlight %}
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.
+
Like Python scalar functions, you can use the above five ways to define Python TableFunctions.
<span class="label label-info">Note</span> The only difference is that the return type of Python Table Functions needs to be an iterable, iterator or generator.
diff --git a/docs/dev/table/python/python_udfs.zh.md b/docs/dev/table/python/python_udfs.zh.md
index b19120e..1d83da5 100644
--- a/docs/dev/table/python/python_udfs.zh.md
+++ b/docs/dev/table/python/python_udfs.zh.md
@@ -48,6 +48,9 @@ class HashCode(ScalarFunction):
table_env = BatchTableEnvironment.create(env)
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
# register the Python function
table_env.register_function("hash_code", udf(HashCode(), DataTypes.BIGINT(), DataTypes.BIGINT()))
@@ -58,6 +61,10 @@ my_table.select("string, bigint, bigint.hash_code(), hash_code(bigint)")
table_env.sql_query("SELECT string, bigint, hash_code(bigint) FROM MyTable")
{% endhighlight %}
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.
+
It also supports to use Java/Scala scalar functions in Python Table API programs.
{% highlight python %}
@@ -76,6 +83,9 @@ public class HashCode extends ScalarFunction {
table_env = BatchTableEnvironment.create(env)
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
# register the Java function
table_env.register_java_function("hash_code", "my.java.function.HashCode")
@@ -86,6 +96,10 @@ my_table.select("string.hash_code(), hash_code(string)")
table_env.sql_query("SELECT string, bigint, hash_code(string) FROM MyTable")
{% endhighlight %}
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.
+
There are many ways to define a Python scalar function besides extending the base class `ScalarFunction`.
The following examples show the different ways to define a Python scalar function which takes two columns of
bigint as the input parameters and returns the sum of them as the result.
@@ -145,6 +159,9 @@ env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
my_table = ... # type: Table, table schema: [a: String]
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
# register the Python Table Function
table_env.register_function("split", udtf(Split(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.INT()]))
@@ -158,6 +175,9 @@ table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE
{% endhighlight %}
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.
It also supports to use Java/Scala table functions in Python Table API programs.
{% highlight python %}
@@ -182,6 +202,9 @@ env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
my_table = ... # type: Table, table schema: [a: String]
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
# Register the java function.
table_env.register_java_function("split", "my.java.function.Split")
@@ -198,6 +221,10 @@ table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)
table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE")
{% endhighlight %}
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.
+
Like Python scalar functions, you can use the above five ways to define Python TableFunctions.
<span class="label label-info">Note</span> The only difference is that the return type of Python Table Functions needs to be an iterable, iterator or generator.
diff --git a/docs/dev/table/python/vectorized_python_udfs.md b/docs/dev/table/python/vectorized_python_udfs.md
index 8eec40b..ee5f03f 100644
--- a/docs/dev/table/python/vectorized_python_udfs.md
+++ b/docs/dev/table/python/vectorized_python_udfs.md
@@ -54,6 +54,9 @@ def add(i, j):
table_env = BatchTableEnvironment.create(env)
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
# register the vectorized Python scalar function
table_env.register_function("add", add)
@@ -63,3 +66,7 @@ my_table.select("add(bigint, bigint)")
# use the vectorized Python scalar function in SQL API
table_env.sql_query("SELECT add(bigint, bigint) FROM MyTable")
{% endhighlight %}
+
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.
diff --git a/docs/dev/table/python/vectorized_python_udfs.zh.md b/docs/dev/table/python/vectorized_python_udfs.zh.md
index c50bc5b..b2d2ed9 100644
--- a/docs/dev/table/python/vectorized_python_udfs.zh.md
+++ b/docs/dev/table/python/vectorized_python_udfs.zh.md
@@ -54,6 +54,9 @@ def add(i, j):
table_env = BatchTableEnvironment.create(env)
+# configure the off-heap memory of current taskmanager to enable the python worker uses off-heap memory.
+table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
+
# register the vectorized Python scalar function
table_env.register_function("add", add)
@@ -63,3 +66,7 @@ my_table.select("add(bigint, bigint)")
# use the vectorized Python scalar function in SQL API
table_env.sql_query("SELECT add(bigint, bigint) FROM MyTable")
{% endhighlight %}
+
+<span class="label label-info">Note</span> If not using RocksDB as state backend, you can also configure the python
+worker to use the managed memory of taskmanager by setting **python.fn-execution.memory.managed** to be **true**.
+Then there is no need to set the the configuration **taskmanager.memory.task.off-heap.size**.