You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yik San Chan <ev...@gmail.com> on 2021/04/27 08:39:10 UTC

How to load resource in a PyFlink UDF

Hi,

My UDF has the dependency to a resource file named crypt.csv that is
located in resources/ directory.

```python
# udf_use_resource.py
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def decrypt(s):
import pandas as pd
d = pd.read_csv('resources/crypt.csv', header=None, index_col=0, squeeze=
True).to_dict()
return d.get(s, "unknown")
```

I run the job in local mode (i.e., python udf_use_resource.py) without any
problem. However, when I try to run it with
`~/softwares/flink-1.12.0/bin/flink run -d -pyexec
/usr/local/anaconda3/envs/featflow-ml-env/bin/python -pyarch resources.zip
-py udf_use_resource.py` on my local cluster, it complains:

FileNotFoundError: [Errno 2] File b'resources/crypt.csv' does not exist: b
'resources/crypt.csv'

The resources.zip is zipped from the resources directory. I wonder: where
do I go wrong?

Note: udf_use_resource.py and resources/crypt.csv can be found in
https://github.com/YikSanChan/pyflink-quickstart/tree/36bfab4ff830f57d3f23f285c7c5499a03385b71
.

Thanks!

Best,
Yik San

Re: How to load resource in a PyFlink UDF

Posted by Yik San Chan <ev...@gmail.com>.
Hi Dian,

Thank you for the detailed answer!

Best,
Yik San

On Tue, Apr 27, 2021 at 5:42 PM Dian Fu <di...@gmail.com> wrote:

> Hi Yik San,
>
> Command line option `-pyarch` could be used to specify archive files such
> as Python virtual environment, ML model, data file, etc.
>
> So for resources.zip, -pyarch makes more sense than -pyfs.
>
> Regards,
> Dian
>
> 2021年4月27日 下午5:14,Yik San Chan <ev...@gmail.com> 写道:
>
> Hi Dian,
>
> Thank you! That solves my question. By the way, for my use case, does
> -pyarch make more sense than -pyfs?
>
> Best,
> Yik San
>
> On Tue, Apr 27, 2021 at 4:52 PM Dian Fu <di...@gmail.com> wrote:
>
>> Hi Yik San,
>>
>> Could you try `pd.read_csv(‘resources.zip/resources/crypt.csv’, xxx)`?
>>
>> Regards,
>> Dian
>>
>> 2021年4月27日 下午4:39,Yik San Chan <ev...@gmail.com> 写道:
>>
>> Hi,
>>
>> My UDF has the dependency to a resource file named crypt.csv that is
>> located in resources/ directory.
>>
>> ```python
>> # udf_use_resource.py
>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>> def decrypt(s):
>> import pandas as pd
>> d = pd.read_csv('resources/crypt.csv', header=None, index_col=0, squeeze=
>> True).to_dict()
>> return d.get(s, "unknown")
>> ```
>>
>> I run the job in local mode (i.e., python udf_use_resource.py) without
>> any problem. However, when I try to run it with
>> `~/softwares/flink-1.12.0/bin/flink run -d -pyexec
>> /usr/local/anaconda3/envs/featflow-ml-env/bin/python -pyarch resources.zip
>> -py udf_use_resource.py` on my local cluster, it complains:
>>
>> FileNotFoundError: [Errno 2] File b'resources/crypt.csv' does not exist:
>> b'resources/crypt.csv'
>>
>> The resources.zip is zipped from the resources directory. I wonder: where
>> do I go wrong?
>>
>> Note: udf_use_resource.py and resources/crypt.csv can be found in
>> https://github.com/YikSanChan/pyflink-quickstart/tree/36bfab4ff830f57d3f23f285c7c5499a03385b71
>> .
>>
>> Thanks!
>>
>> Best,
>> Yik San
>>
>>
>>
>

Re: How to load resource in a PyFlink UDF

Posted by Dian Fu <di...@gmail.com>.
Hi Yik San,

Command line option `-pyarch` could be used to specify archive files such as Python virtual environment, ML model, data file, etc.

So for resources.zip, -pyarch makes more sense than -pyfs.

Regards,
Dian

> 2021年4月27日 下午5:14,Yik San Chan <ev...@gmail.com> 写道:
> 
> Hi Dian,
> 
> Thank you! That solves my question. By the way, for my use case, does -pyarch make more sense than -pyfs?
> 
> Best,
> Yik San
> 
> On Tue, Apr 27, 2021 at 4:52 PM Dian Fu <dian0511.fu@gmail.com <ma...@gmail.com>> wrote:
> Hi Yik San,
> 
> Could you try `pd.read_csv(‘resources.zip/resources/crypt.csv’, xxx)`?
> 
> Regards,
> Dian
> 
>> 2021年4月27日 下午4:39,Yik San Chan <evan.chanyiksan@gmail.com <ma...@gmail.com>> 写道:
>> 
>> Hi,
>> 
>> My UDF has the dependency to a resource file named crypt.csv that is located in resources/ directory.
>> 
>> ```python
>> # udf_use_resource.py
>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>> def decrypt(s):
>>     import pandas as pd
>>     d = pd.read_csv('resources/crypt.csv', header=None, index_col=0, squeeze=True).to_dict()
>>     return d.get(s, "unknown")
>> ```
>> 
>> I run the job in local mode (i.e., python udf_use_resource.py) without any problem. However, when I try to run it with `~/softwares/flink-1.12.0/bin/flink run -d -pyexec /usr/local/anaconda3/envs/featflow-ml-env/bin/python -pyarch resources.zip -py udf_use_resource.py` on my local cluster, it complains:
>> 
>> FileNotFoundError: [Errno 2] File b'resources/crypt.csv' does not exist: b'resources/crypt.csv'
>> 
>> The resources.zip is zipped from the resources directory. I wonder: where do I go wrong?
>> 
>> Note: udf_use_resource.py and resources/crypt.csv can be found in https://github.com/YikSanChan/pyflink-quickstart/tree/36bfab4ff830f57d3f23f285c7c5499a03385b71 <https://github.com/YikSanChan/pyflink-quickstart/tree/36bfab4ff830f57d3f23f285c7c5499a03385b71>.
>> 
>> Thanks!
>> 
>> Best,
>> Yik San
> 


Re: How to load resource in a PyFlink UDF

Posted by Yik San Chan <ev...@gmail.com>.
Hi Dian,

Thank you! That solves my question. By the way, for my use case, does
-pyarch make more sense than -pyfs?

Best,
Yik San

On Tue, Apr 27, 2021 at 4:52 PM Dian Fu <di...@gmail.com> wrote:

> Hi Yik San,
>
> Could you try `pd.read_csv(‘resources.zip/resources/crypt.csv’, xxx)`?
>
> Regards,
> Dian
>
> 2021年4月27日 下午4:39,Yik San Chan <ev...@gmail.com> 写道:
>
> Hi,
>
> My UDF has the dependency to a resource file named crypt.csv that is
> located in resources/ directory.
>
> ```python
> # udf_use_resource.py
> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
> def decrypt(s):
> import pandas as pd
> d = pd.read_csv('resources/crypt.csv', header=None, index_col=0, squeeze=
> True).to_dict()
> return d.get(s, "unknown")
> ```
>
> I run the job in local mode (i.e., python udf_use_resource.py) without any
> problem. However, when I try to run it with
> `~/softwares/flink-1.12.0/bin/flink run -d -pyexec
> /usr/local/anaconda3/envs/featflow-ml-env/bin/python -pyarch resources.zip
> -py udf_use_resource.py` on my local cluster, it complains:
>
> FileNotFoundError: [Errno 2] File b'resources/crypt.csv' does not exist: b
> 'resources/crypt.csv'
>
> The resources.zip is zipped from the resources directory. I wonder: where
> do I go wrong?
>
> Note: udf_use_resource.py and resources/crypt.csv can be found in
> https://github.com/YikSanChan/pyflink-quickstart/tree/36bfab4ff830f57d3f23f285c7c5499a03385b71
> .
>
> Thanks!
>
> Best,
> Yik San
>
>
>

Re: How to load resource in a PyFlink UDF

Posted by Dian Fu <di...@gmail.com>.
Hi Yik San,

Could you try `pd.read_csv(‘resources.zip/resources/crypt.csv’, xxx)`?

Regards,
Dian

> 2021年4月27日 下午4:39,Yik San Chan <ev...@gmail.com> 写道:
> 
> Hi,
> 
> My UDF has the dependency to a resource file named crypt.csv that is located in resources/ directory.
> 
> ```python
> # udf_use_resource.py
> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
> def decrypt(s):
>     import pandas as pd
>     d = pd.read_csv('resources/crypt.csv', header=None, index_col=0, squeeze=True).to_dict()
>     return d.get(s, "unknown")
> ```
> 
> I run the job in local mode (i.e., python udf_use_resource.py) without any problem. However, when I try to run it with `~/softwares/flink-1.12.0/bin/flink run -d -pyexec /usr/local/anaconda3/envs/featflow-ml-env/bin/python -pyarch resources.zip -py udf_use_resource.py` on my local cluster, it complains:
> 
> FileNotFoundError: [Errno 2] File b'resources/crypt.csv' does not exist: b'resources/crypt.csv'
> 
> The resources.zip is zipped from the resources directory. I wonder: where do I go wrong?
> 
> Note: udf_use_resource.py and resources/crypt.csv can be found in https://github.com/YikSanChan/pyflink-quickstart/tree/36bfab4ff830f57d3f23f285c7c5499a03385b71 <https://github.com/YikSanChan/pyflink-quickstart/tree/36bfab4ff830f57d3f23f285c7c5499a03385b71>.
> 
> Thanks!
> 
> Best,
> Yik San