You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Sharipov, Rinat" <r....@cleverdata.ru> on 2020/10/12 17:56:05 UTC

PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

Hi mates !

I'm very new at pyflink and trying to register a custom UDF function using
python API.
Currently I faced an issue in both server env and my local IDE environment.

When I'm trying to execute the example below I got an error message: *The
configured Task Off-Heap Memory 0 bytes is less than the least required
Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
using the configuration key 'taskmanager.memory.task.off-heap.size*

Of course I've added required property into *flink-conf.yaml *and checked
that *pyflink-shell.sh *initializes env using specified configuration but
it doesn't make any sense and I still have an error.

I've also attached my flink-conf.yaml file

Thx for your help !

*Here is an example:*

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def test_udf(i):
    return i


if __name__ == "__main__":
    env = ExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    bt_env = BatchTableEnvironment.create(env)
    bt_env.register_function("test_udf", test_udf)

    my_table = bt_env.from_elements(
        [
            ("user-1", "http://url/1"),
            ("user-2", "http://url/2"),
            ("user-1", "http://url/3"),
            ("user-3", "http://url/4"),
            ("user-1", "http://url/3")
        ],
        [
            "uid", "url"
        ]
    )

    my_table_grouped_by_uid = my_table.group_by("uid").select("uid,
collect(url) as urls")
    bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)

    bt_env.execute_sql("select test_udf(uid) as uid, urls from
my_temp_table").print()

Re: PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

Posted by "Sharipov, Rinat" <r....@cleverdata.ru>.
The main confusion for me was that *pyflink-shell *calls environment
configuration script, so I was hoping that it should configure the stream/
table environment because it configures the path to *flink-conf.yaml*

Here is a code sample from *pyflink-shell*

[image: Screenshot 2020-10-13 at 11.59.08.png]



Best,
Rinat

вт, 13 окт. 2020 г. в 10:28, Xingbo Huang <hx...@gmail.com>:

> Hi,
>
> From my point of view, pyflink-shell only provides an interactive tool.
> Below it, you can choose whether to run the job in minicluster(similar to
> python xx.py)  or submit it to the cluster through flink run. For python
> xxx.py, it is reasonable to not load the config of flink-conf.yaml. What do
> you think?
>
>
> Best,
> Xingbo
>
> Sharipov, Rinat <r....@cleverdata.ru> 于2020年10月13日周二 下午2:16写道:
>
>> Hi Xingbo, thx a lot, it works !
>>
>> But I'm still sure that it's not obvious from a user point of view, that *pyflink-shell.sh
>> *doesn't use provided flink-conf.yaml, don't you think that it looks
>> like an issue ?
>>
>> Thx !
>>
>> вт, 13 окт. 2020 г. в 05:35, Xingbo Huang <hx...@gmail.com>:
>>
>>> Hi,
>>>
>>> You can use api to set configuration:
>>> table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>>> '80m')
>>>
>>> The flink-conf.yaml way will only take effect when submitted through
>>> flink run, and the minicluster way(python xxx.py) will not take effect.
>>>
>>> Best,
>>> Xingbo
>>>
>>> Sharipov, Rinat <r....@cleverdata.ru> 于2020年10月13日周二 上午1:56写道:
>>>
>>>> Hi mates !
>>>>
>>>> I'm very new at pyflink and trying to register a custom UDF function
>>>> using python API.
>>>> Currently I faced an issue in both server env and my local IDE
>>>> environment.
>>>>
>>>> When I'm trying to execute the example below I got an error message: *The
>>>> configured Task Off-Heap Memory 0 bytes is less than the least required
>>>> Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
>>>> using the configuration key 'taskmanager.memory.task.off-heap.size*
>>>>
>>>> Of course I've added required property into *flink-conf.yaml *and
>>>> checked that *pyflink-shell.sh *initializes env using specified
>>>> configuration but it doesn't make any sense and I still have an error.
>>>>
>>>> I've also attached my flink-conf.yaml file
>>>>
>>>> Thx for your help !
>>>>
>>>> *Here is an example:*
>>>>
>>>> from pyflink.dataset import ExecutionEnvironment
>>>> from pyflink.table import BatchTableEnvironment, DataTypes
>>>> from pyflink.table.udf import udf
>>>>
>>>>
>>>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>>>> def test_udf(i):
>>>>     return i
>>>>
>>>>
>>>> if __name__ == "__main__":
>>>>     env = ExecutionEnvironment.get_execution_environment()
>>>>     env.set_parallelism(1)
>>>>
>>>>     bt_env = BatchTableEnvironment.create(env)
>>>>     bt_env.register_function("test_udf", test_udf)
>>>>
>>>>     my_table = bt_env.from_elements(
>>>>         [
>>>>             ("user-1", "http://url/1"),
>>>>             ("user-2", "http://url/2"),
>>>>             ("user-1", "http://url/3"),
>>>>             ("user-3", "http://url/4"),
>>>>             ("user-1", "http://url/3")
>>>>         ],
>>>>         [
>>>>             "uid", "url"
>>>>         ]
>>>>     )
>>>>
>>>>     my_table_grouped_by_uid = my_table.group_by("uid").select("uid, collect(url) as urls")
>>>>     bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)
>>>>
>>>>     bt_env.execute_sql("select test_udf(uid) as uid, urls from my_temp_table").print()
>>>>
>>>>
>>>>
>>>>

Re: PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,

From my point of view, pyflink-shell only provides an interactive tool.
Below it, you can choose whether to run the job in minicluster(similar to
python xx.py)  or submit it to the cluster through flink run. For python
xxx.py, it is reasonable to not load the config of flink-conf.yaml. What do
you think?


Best,
Xingbo

Sharipov, Rinat <r....@cleverdata.ru> 于2020年10月13日周二 下午2:16写道:

> Hi Xingbo, thx a lot, it works !
>
> But I'm still sure that it's not obvious from a user point of view, that *pyflink-shell.sh
> *doesn't use provided flink-conf.yaml, don't you think that it looks like
> an issue ?
>
> Thx !
>
> вт, 13 окт. 2020 г. в 05:35, Xingbo Huang <hx...@gmail.com>:
>
>> Hi,
>>
>> You can use api to set configuration:
>> table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>> '80m')
>>
>> The flink-conf.yaml way will only take effect when submitted through
>> flink run, and the minicluster way(python xxx.py) will not take effect.
>>
>> Best,
>> Xingbo
>>
>> Sharipov, Rinat <r....@cleverdata.ru> 于2020年10月13日周二 上午1:56写道:
>>
>>> Hi mates !
>>>
>>> I'm very new at pyflink and trying to register a custom UDF function
>>> using python API.
>>> Currently I faced an issue in both server env and my local IDE
>>> environment.
>>>
>>> When I'm trying to execute the example below I got an error message: *The
>>> configured Task Off-Heap Memory 0 bytes is less than the least required
>>> Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
>>> using the configuration key 'taskmanager.memory.task.off-heap.size*
>>>
>>> Of course I've added required property into *flink-conf.yaml *and
>>> checked that *pyflink-shell.sh *initializes env using specified
>>> configuration but it doesn't make any sense and I still have an error.
>>>
>>> I've also attached my flink-conf.yaml file
>>>
>>> Thx for your help !
>>>
>>> *Here is an example:*
>>>
>>> from pyflink.dataset import ExecutionEnvironment
>>> from pyflink.table import BatchTableEnvironment, DataTypes
>>> from pyflink.table.udf import udf
>>>
>>>
>>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>>> def test_udf(i):
>>>     return i
>>>
>>>
>>> if __name__ == "__main__":
>>>     env = ExecutionEnvironment.get_execution_environment()
>>>     env.set_parallelism(1)
>>>
>>>     bt_env = BatchTableEnvironment.create(env)
>>>     bt_env.register_function("test_udf", test_udf)
>>>
>>>     my_table = bt_env.from_elements(
>>>         [
>>>             ("user-1", "http://url/1"),
>>>             ("user-2", "http://url/2"),
>>>             ("user-1", "http://url/3"),
>>>             ("user-3", "http://url/4"),
>>>             ("user-1", "http://url/3")
>>>         ],
>>>         [
>>>             "uid", "url"
>>>         ]
>>>     )
>>>
>>>     my_table_grouped_by_uid = my_table.group_by("uid").select("uid, collect(url) as urls")
>>>     bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)
>>>
>>>     bt_env.execute_sql("select test_udf(uid) as uid, urls from my_temp_table").print()
>>>
>>>
>>>
>>>

Re: PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

Posted by "Sharipov, Rinat" <r....@cleverdata.ru>.
Hi Xingbo, thx a lot, it works !

But I'm still sure that it's not obvious from a user point of view,
that *pyflink-shell.sh
*doesn't use provided flink-conf.yaml, don't you think that it looks like
an issue ?

Thx !

вт, 13 окт. 2020 г. в 05:35, Xingbo Huang <hx...@gmail.com>:

> Hi,
>
> You can use api to set configuration:
> table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> '80m')
>
> The flink-conf.yaml way will only take effect when submitted through flink
> run, and the minicluster way(python xxx.py) will not take effect.
>
> Best,
> Xingbo
>
> Sharipov, Rinat <r....@cleverdata.ru> 于2020年10月13日周二 上午1:56写道:
>
>> Hi mates !
>>
>> I'm very new at pyflink and trying to register a custom UDF function
>> using python API.
>> Currently I faced an issue in both server env and my local IDE
>> environment.
>>
>> When I'm trying to execute the example below I got an error message: *The
>> configured Task Off-Heap Memory 0 bytes is less than the least required
>> Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
>> using the configuration key 'taskmanager.memory.task.off-heap.size*
>>
>> Of course I've added required property into *flink-conf.yaml *and
>> checked that *pyflink-shell.sh *initializes env using specified
>> configuration but it doesn't make any sense and I still have an error.
>>
>> I've also attached my flink-conf.yaml file
>>
>> Thx for your help !
>>
>> *Here is an example:*
>>
>> from pyflink.dataset import ExecutionEnvironment
>> from pyflink.table import BatchTableEnvironment, DataTypes
>> from pyflink.table.udf import udf
>>
>>
>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>> def test_udf(i):
>>     return i
>>
>>
>> if __name__ == "__main__":
>>     env = ExecutionEnvironment.get_execution_environment()
>>     env.set_parallelism(1)
>>
>>     bt_env = BatchTableEnvironment.create(env)
>>     bt_env.register_function("test_udf", test_udf)
>>
>>     my_table = bt_env.from_elements(
>>         [
>>             ("user-1", "http://url/1"),
>>             ("user-2", "http://url/2"),
>>             ("user-1", "http://url/3"),
>>             ("user-3", "http://url/4"),
>>             ("user-1", "http://url/3")
>>         ],
>>         [
>>             "uid", "url"
>>         ]
>>     )
>>
>>     my_table_grouped_by_uid = my_table.group_by("uid").select("uid, collect(url) as urls")
>>     bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)
>>
>>     bt_env.execute_sql("select test_udf(uid) as uid, urls from my_temp_table").print()
>>
>>
>>
>>

Re: PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,

You can use api to set configuration:
table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')

The flink-conf.yaml way will only take effect when submitted through flink
run, and the minicluster way(python xxx.py) will not take effect.

Best,
Xingbo

Sharipov, Rinat <r....@cleverdata.ru> 于2020年10月13日周二 上午1:56写道:

> Hi mates !
>
> I'm very new at pyflink and trying to register a custom UDF function using
> python API.
> Currently I faced an issue in both server env and my local IDE
> environment.
>
> When I'm trying to execute the example below I got an error message: *The
> configured Task Off-Heap Memory 0 bytes is less than the least required
> Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
> using the configuration key 'taskmanager.memory.task.off-heap.size*
>
> Of course I've added required property into *flink-conf.yaml *and checked
> that *pyflink-shell.sh *initializes env using specified configuration but
> it doesn't make any sense and I still have an error.
>
> I've also attached my flink-conf.yaml file
>
> Thx for your help !
>
> *Here is an example:*
>
> from pyflink.dataset import ExecutionEnvironment
> from pyflink.table import BatchTableEnvironment, DataTypes
> from pyflink.table.udf import udf
>
>
> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
> def test_udf(i):
>     return i
>
>
> if __name__ == "__main__":
>     env = ExecutionEnvironment.get_execution_environment()
>     env.set_parallelism(1)
>
>     bt_env = BatchTableEnvironment.create(env)
>     bt_env.register_function("test_udf", test_udf)
>
>     my_table = bt_env.from_elements(
>         [
>             ("user-1", "http://url/1"),
>             ("user-2", "http://url/2"),
>             ("user-1", "http://url/3"),
>             ("user-3", "http://url/4"),
>             ("user-1", "http://url/3")
>         ],
>         [
>             "uid", "url"
>         ]
>     )
>
>     my_table_grouped_by_uid = my_table.group_by("uid").select("uid, collect(url) as urls")
>     bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)
>
>     bt_env.execute_sql("select test_udf(uid) as uid, urls from my_temp_table").print()
>
>
>
>