You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yik San Chan <ev...@gmail.com> on 2021/04/27 12:01:53 UTC

ModuleNotFound when loading udf from another py file

Hi,

Here's the reproducible code sample:
https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3

I implement my Python UDF by extending the ScalarFunction base class in a
separate file named decrypt_fun.py, and try to import the udf into my main
python file named udf_use_resource.py.

However, after I `flink run`, I find the error log in TaskManager log:

```
Caused by: java.lang.RuntimeException: Error received from SDK harness for
instruction 1: Traceback (most recent call last):
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 376, in get
processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 253, in _execute
response = task()
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 310, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 480, in do_instruction
getattr(request, request_type), request.instruction_id)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 509, in process_bundle
instruction_id, request.process_bundle_descriptor_id)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 382, in get
self.data_channel_factory)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 847, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 902, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 901, in <listcomp>
(transform_id, get_operation(transform_id)) for transform_id in sorted(
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 791, in wrapper
result = cache[args] = func(*args)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 885, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 885, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 883, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 791, in wrapper
result = cache[args] = func(*args)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 888, in get_operation
transform_id, transform_consumers)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1174, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py",
line 39, in create_scalar_function
operations.ScalarFunctionOperation)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py",
line 166, in _create_user_defined_function_operation
internal_operation_cls)
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in
pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in
pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
line 91, in __init__
super(ScalarFunctionOperation, self).__init__(spec)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
line 56, in __init__
self.func, self.user_defined_funcs =
self.generate_func(self.spec.serialized_fn)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
line 105, in generate_func
[operation_utils.extract_user_defined_function(udf) for udf in
serialized_fn.udfs])
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
line 105, in <listcomp>
[operation_utils.extract_user_defined_function(udf) for udf in
serialized_fn.udfs])
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py",
line 86, in extract_user_defined_function
user_defined_func = pickle.loads(user_defined_function_proto.payload)
File
"/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py",
line 29, in loads
return cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'decrypt_fun'

    at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_282]
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_282]
    ... 1 more
```

I wonder why? If I move the Decrypt class into udf_use_resource.py,
everything works just fine.

Thank you!

Best,
Yik San

Re: ModuleNotFound when loading udf from another py file

Posted by Dian Fu <di...@gmail.com>.
Thanks a lot~

> 2021年4月28日 上午8:25,Yik San Chan <ev...@gmail.com> 写道:
> 
> Hi Dian,
> 
> I follow up with this PR https://github.com/apache/flink/pull/15790 <https://github.com/apache/flink/pull/15790>
> On Tue, Apr 27, 2021 at 11:03 PM Dian Fu <dian0511.fu@gmail.com <ma...@gmail.com>> wrote:
> Hi Yik San,
> 
> Make sense to me. :)
> 
> Regards,
> Dian
> 
>> 2021年4月27日 下午9:50,Yik San Chan <evan.chanyiksan@gmail.com <ma...@gmail.com>> 写道:
>> 
>> Hi Dian,
>> 
>> Wow, this is unexpected 😮 How about adding documentations to Python UDF about this? Again it can be time consuming to figure this out. Maybe:
>> 
>> To be able to run Python UDFs in any non-local mode, it is recommended to include your UDF definitions using -pyfs config option, if your UDFs live outside of the file where the main() function is defined.
>> 
>> What do you think?
>> 
>> Best,
>> Yik San
>> 
>> On Tue, Apr 27, 2021 at 9:24 PM Dian Fu <dian0511.fu@gmail.com <ma...@gmail.com>> wrote:
>> I guess this is the magic of cloud pickle. PyFlink depends on cloud pickle to serialize the Python UDF. 
>> 
>> For the latter case, I guess the whole Python UDF implementation will be serialized. However, for the previous case, only the path of the class is serialized.
>> 
>> Regards,
>> Dian
>> 
>>> 2021年4月27日 下午8:52,Yik San Chan <evan.chanyiksan@gmail.com <ma...@gmail.com>> 写道:
>>> 
>>> Hi Dian,
>>> 
>>> Thanks! Adding -pyfs definitely helps.
>>> 
>>> However, I am curious. If I define my udf this way:
>>> 
>>> ```python
>>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>>> def decrypt(s):
>>>     import pandas as pd
>>>     d = pd.read_csv('resources.zip/resources/crypt.csv', header=None, index_col=0, squeeze=True).to_dict()
>>>     return d.get(s, "unknown")
>>> ```
>>> 
>>> I can `flink run` without having to specify -pyfs option. The code can also be found in the commit https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607 <https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607>. I wonder why?
>>> 
>>> Best,
>>> Yik San
>>> 
>>> On Tue, Apr 27, 2021 at 8:13 PM Dian Fu <dian0511.fu@gmail.com <ma...@gmail.com>> wrote:
>>> Hi Yik San,
>>> 
>>> From the exception message, it’s clear that it could not find module `decrypt_fun` during execution.
>>> 
>>> You need to specify file `decrypt_fun.py` as a dependency during submitting the job, e.g. via -pyfs command line arguments. Otherwise, this file will not be available during execution.
>>> 
>>> Regards,
>>> Dian
>>> 
>>>> 2021年4月27日 下午8:01,Yik San Chan <evan.chanyiksan@gmail.com <ma...@gmail.com>> 写道:
>>>> 
>>>> Hi,
>>>> 
>>>> Here's the reproducible code sample: https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3 <https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3>
>>>> 
>>>> I implement my Python UDF by extending the ScalarFunction base class in a separate file named decrypt_fun.py, and try to import the udf into my main python file named udf_use_resource.py.
>>>> 
>>>> However, after I `flink run`, I find the error log in TaskManager log:
>>>> 
>>>> ```
>>>> Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get
>>>>     processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>>>> IndexError: pop from empty list
>>>> 
>>>> During handling of the above exception, another exception occurred:
>>>> 
>>>> Traceback (most recent call last):
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
>>>>     response = task()
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
>>>>     lambda: self.create_worker().do_instruction(request), request)
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
>>>>     getattr(request, request_type), request.instruction_id)
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle
>>>>     instruction_id, request.process_bundle_descriptor_id)
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get
>>>>     self.data_channel_factory)
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__
>>>>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree
>>>>     descriptor.transforms, key=topological_height, reverse=True)
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp>
>>>>     (transform_id, get_operation(transform_id)) for transform_id in sorted(
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
>>>>     result = cache[args] = func(*args)
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation
>>>>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp>
>>>>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp>
>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
>>>>     result = cache[args] = func(*args)
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation
>>>>     transform_id, transform_consumers)
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation
>>>>     return creator(self, transform_id, transform_proto, payload, consumers)
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function
>>>>     operations.ScalarFunctionOperation)
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation
>>>>     internal_operation_cls)
>>>>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
>>>>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
>>>>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__
>>>>     super(ScalarFunctionOperation, self).__init__(spec)
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__
>>>>     self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn)
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func
>>>>     [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp>
>>>>     [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function
>>>>     user_defined_func = pickle.loads(user_defined_function_proto.payload)
>>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
>>>>     return cloudpickle.loads(payload)
>>>> ModuleNotFoundError: No module named 'decrypt_fun'
>>>> 
>>>>     at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_282]
>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_282]
>>>>     ... 1 more
>>>> ```
>>>> 
>>>> I wonder why? If I move the Decrypt class into udf_use_resource.py, everything works just fine.
>>>> 
>>>> Thank you!
>>>> 
>>>> Best,
>>>> Yik San
>>> 
>> 
> 


Re: ModuleNotFound when loading udf from another py file

Posted by Yik San Chan <ev...@gmail.com>.
Hi Dian,

I follow up with this PR https://github.com/apache/flink/pull/15790

On Tue, Apr 27, 2021 at 11:03 PM Dian Fu <di...@gmail.com> wrote:

> Hi Yik San,
>
> Make sense to me. :)
>
> Regards,
> Dian
>
> 2021年4月27日 下午9:50,Yik San Chan <ev...@gmail.com> 写道:
>
> Hi Dian,
>
> Wow, this is unexpected 😮 How about adding documentations to Python UDF
> about this? Again it can be time consuming to figure this out. Maybe:
>
> To be able to run Python UDFs in any non-local mode, it is recommended to
> include your UDF definitions using -pyfs config option, if your UDFs live
> outside of the file where the main() function is defined.
>
> What do you think?
>
> Best,
> Yik San
>
> On Tue, Apr 27, 2021 at 9:24 PM Dian Fu <di...@gmail.com> wrote:
>
>> I guess this is the magic of cloud pickle. PyFlink depends on cloud
>> pickle to serialize the Python UDF.
>>
>> For the latter case, I guess the whole Python UDF implementation will be
>> serialized. However, for the previous case, only the path of the class is
>> serialized.
>>
>> Regards,
>> Dian
>>
>> 2021年4月27日 下午8:52,Yik San Chan <ev...@gmail.com> 写道:
>>
>> Hi Dian,
>>
>> Thanks! Adding -pyfs definitely helps.
>>
>> However, I am curious. If I define my udf this way:
>>
>> ```python
>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>> def decrypt(s):
>> import pandas as pd
>> d = pd.read_csv('resources.zip/resources/crypt.csv', header=None,
>> index_col=0, squeeze=True).to_dict()
>> return d.get(s, "unknown")
>> ```
>>
>> I can `flink run` without having to specify -pyfs option. The code can
>> also be found in the commit
>> https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607.
>> I wonder why?
>>
>> Best,
>> Yik San
>>
>> On Tue, Apr 27, 2021 at 8:13 PM Dian Fu <di...@gmail.com> wrote:
>>
>>> Hi Yik San,
>>>
>>> From the exception message, it’s clear that it could not find module
>>> `decrypt_fun` during execution.
>>>
>>> You need to specify file `decrypt_fun.py` as a dependency during
>>> submitting the job, e.g. via -pyfs command line arguments. Otherwise, this
>>> file will not be available during execution.
>>>
>>> Regards,
>>> Dian
>>>
>>> 2021年4月27日 下午8:01,Yik San Chan <ev...@gmail.com> 写道:
>>>
>>> Hi,
>>>
>>> Here's the reproducible code sample:
>>> https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3
>>>
>>> I implement my Python UDF by extending the ScalarFunction base class in
>>> a separate file named decrypt_fun.py, and try to import the udf into my
>>> main python file named udf_use_resource.py.
>>>
>>> However, after I `flink run`, I find the error log in TaskManager log:
>>>
>>> ```
>>> Caused by: java.lang.RuntimeException: Error received from SDK harness
>>> for instruction 1: Traceback (most recent call last):
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 376, in get
>>> processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>>> IndexError: pop from empty list
>>>
>>> During handling of the above exception, another exception occurred:
>>>
>>> Traceback (most recent call last):
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 253, in _execute
>>> response = task()
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 310, in <lambda>
>>> lambda: self.create_worker().do_instruction(request), request)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 480, in do_instruction
>>> getattr(request, request_type), request.instruction_id)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 509, in process_bundle
>>> instruction_id, request.process_bundle_descriptor_id)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 382, in get
>>> self.data_channel_factory)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 847, in __init__
>>> self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 902, in create_execution_tree
>>> descriptor.transforms, key=topological_height, reverse=True)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 901, in <listcomp>
>>> (transform_id, get_operation(transform_id)) for transform_id in sorted(
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 791, in wrapper
>>> result = cache[args] = func(*args)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 885, in get_operation
>>> pcoll_id in descriptor.transforms[transform_id].outputs.items()
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 885, in <dictcomp>
>>> pcoll_id in descriptor.transforms[transform_id].outputs.items()
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 883, in <listcomp>
>>> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 791, in wrapper
>>> result = cache[args] = func(*args)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 888, in get_operation
>>> transform_id, transform_consumers)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 1174, in create_operation
>>> return creator(self, transform_id, transform_proto, payload, consumers)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py",
>>> line 39, in create_scalar_function
>>> operations.ScalarFunctionOperation)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py",
>>> line 166, in _create_user_defined_function_operation
>>> internal_operation_cls)
>>> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in
>>> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
>>> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in
>>> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
>>> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in
>>> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
>>> line 91, in __init__
>>> super(ScalarFunctionOperation, self).__init__(spec)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
>>> line 56, in __init__
>>> self.func, self.user_defined_funcs =
>>> self.generate_func(self.spec.serialized_fn)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
>>> line 105, in generate_func
>>> [operation_utils.extract_user_defined_function(udf) for udf in
>>> serialized_fn.udfs])
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
>>> line 105, in <listcomp>
>>> [operation_utils.extract_user_defined_function(udf) for udf in
>>> serialized_fn.udfs])
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py",
>>> line 86, in extract_user_defined_function
>>> user_defined_func = pickle.loads(user_defined_function_proto.payload)
>>> File
>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py",
>>> line 29, in loads
>>> return cloudpickle.loads(payload)
>>> ModuleNotFoundError: No module named 'decrypt_fun'
>>>
>>>     at
>>> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at
>>> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> ~[?:1.8.0_282]
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> ~[?:1.8.0_282]
>>>     ... 1 more
>>> ```
>>>
>>> I wonder why? If I move the Decrypt class into udf_use_resource.py,
>>> everything works just fine.
>>>
>>> Thank you!
>>>
>>> Best,
>>> Yik San
>>>
>>>
>>>
>>
>

Re: ModuleNotFound when loading udf from another py file

Posted by Dian Fu <di...@gmail.com>.
Hi Yik San,

Make sense to me. :)

Regards,
Dian

> 2021年4月27日 下午9:50,Yik San Chan <ev...@gmail.com> 写道:
> 
> Hi Dian,
> 
> Wow, this is unexpected 😮 How about adding documentations to Python UDF about this? Again it can be time consuming to figure this out. Maybe:
> 
> To be able to run Python UDFs in any non-local mode, it is recommended to include your UDF definitions using -pyfs config option, if your UDFs live outside of the file where the main() function is defined.
> 
> What do you think?
> 
> Best,
> Yik San
> 
> On Tue, Apr 27, 2021 at 9:24 PM Dian Fu <dian0511.fu@gmail.com <ma...@gmail.com>> wrote:
> I guess this is the magic of cloud pickle. PyFlink depends on cloud pickle to serialize the Python UDF. 
> 
> For the latter case, I guess the whole Python UDF implementation will be serialized. However, for the previous case, only the path of the class is serialized.
> 
> Regards,
> Dian
> 
>> 2021年4月27日 下午8:52,Yik San Chan <evan.chanyiksan@gmail.com <ma...@gmail.com>> 写道:
>> 
>> Hi Dian,
>> 
>> Thanks! Adding -pyfs definitely helps.
>> 
>> However, I am curious. If I define my udf this way:
>> 
>> ```python
>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>> def decrypt(s):
>>     import pandas as pd
>>     d = pd.read_csv('resources.zip/resources/crypt.csv', header=None, index_col=0, squeeze=True).to_dict()
>>     return d.get(s, "unknown")
>> ```
>> 
>> I can `flink run` without having to specify -pyfs option. The code can also be found in the commit https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607 <https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607>. I wonder why?
>> 
>> Best,
>> Yik San
>> 
>> On Tue, Apr 27, 2021 at 8:13 PM Dian Fu <dian0511.fu@gmail.com <ma...@gmail.com>> wrote:
>> Hi Yik San,
>> 
>> From the exception message, it’s clear that it could not find module `decrypt_fun` during execution.
>> 
>> You need to specify file `decrypt_fun.py` as a dependency during submitting the job, e.g. via -pyfs command line arguments. Otherwise, this file will not be available during execution.
>> 
>> Regards,
>> Dian
>> 
>>> 2021年4月27日 下午8:01,Yik San Chan <evan.chanyiksan@gmail.com <ma...@gmail.com>> 写道:
>>> 
>>> Hi,
>>> 
>>> Here's the reproducible code sample: https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3 <https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3>
>>> 
>>> I implement my Python UDF by extending the ScalarFunction base class in a separate file named decrypt_fun.py, and try to import the udf into my main python file named udf_use_resource.py.
>>> 
>>> However, after I `flink run`, I find the error log in TaskManager log:
>>> 
>>> ```
>>> Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get
>>>     processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>>> IndexError: pop from empty list
>>> 
>>> During handling of the above exception, another exception occurred:
>>> 
>>> Traceback (most recent call last):
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
>>>     response = task()
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
>>>     lambda: self.create_worker().do_instruction(request), request)
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
>>>     getattr(request, request_type), request.instruction_id)
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle
>>>     instruction_id, request.process_bundle_descriptor_id)
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get
>>>     self.data_channel_factory)
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__
>>>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree
>>>     descriptor.transforms, key=topological_height, reverse=True)
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp>
>>>     (transform_id, get_operation(transform_id)) for transform_id in sorted(
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
>>>     result = cache[args] = func(*args)
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation
>>>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp>
>>>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp>
>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
>>>     result = cache[args] = func(*args)
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation
>>>     transform_id, transform_consumers)
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation
>>>     return creator(self, transform_id, transform_proto, payload, consumers)
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function
>>>     operations.ScalarFunctionOperation)
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation
>>>     internal_operation_cls)
>>>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
>>>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
>>>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__
>>>     super(ScalarFunctionOperation, self).__init__(spec)
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__
>>>     self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn)
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func
>>>     [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp>
>>>     [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function
>>>     user_defined_func = pickle.loads(user_defined_function_proto.payload)
>>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
>>>     return cloudpickle.loads(payload)
>>> ModuleNotFoundError: No module named 'decrypt_fun'
>>> 
>>>     at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_282]
>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_282]
>>>     ... 1 more
>>> ```
>>> 
>>> I wonder why? If I move the Decrypt class into udf_use_resource.py, everything works just fine.
>>> 
>>> Thank you!
>>> 
>>> Best,
>>> Yik San
>> 
> 


Re: ModuleNotFound when loading udf from another py file

Posted by Yik San Chan <ev...@gmail.com>.
Hi Dian,

Wow, this is unexpected 😮 How about adding documentations to Python UDF
about this? Again it can be time consuming to figure this out. Maybe:

To be able to run Python UDFs in any non-local mode, it is recommended to
include your UDF definitions using -pyfs config option, if your UDFs live
outside of the file where the main() function is defined.

What do you think?

Best,
Yik San

On Tue, Apr 27, 2021 at 9:24 PM Dian Fu <di...@gmail.com> wrote:

> I guess this is the magic of cloud pickle. PyFlink depends on cloud pickle
> to serialize the Python UDF.
>
> For the latter case, I guess the whole Python UDF implementation will be
> serialized. However, for the previous case, only the path of the class is
> serialized.
>
> Regards,
> Dian
>
> 2021年4月27日 下午8:52,Yik San Chan <ev...@gmail.com> 写道:
>
> Hi Dian,
>
> Thanks! Adding -pyfs definitely helps.
>
> However, I am curious. If I define my udf this way:
>
> ```python
> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
> def decrypt(s):
> import pandas as pd
> d = pd.read_csv('resources.zip/resources/crypt.csv', header=None,
> index_col=0, squeeze=True).to_dict()
> return d.get(s, "unknown")
> ```
>
> I can `flink run` without having to specify -pyfs option. The code can
> also be found in the commit
> https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607.
> I wonder why?
>
> Best,
> Yik San
>
> On Tue, Apr 27, 2021 at 8:13 PM Dian Fu <di...@gmail.com> wrote:
>
>> Hi Yik San,
>>
>> From the exception message, it’s clear that it could not find module
>> `decrypt_fun` during execution.
>>
>> You need to specify file `decrypt_fun.py` as a dependency during
>> submitting the job, e.g. via -pyfs command line arguments. Otherwise, this
>> file will not be available during execution.
>>
>> Regards,
>> Dian
>>
>> 2021年4月27日 下午8:01,Yik San Chan <ev...@gmail.com> 写道:
>>
>> Hi,
>>
>> Here's the reproducible code sample:
>> https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3
>>
>> I implement my Python UDF by extending the ScalarFunction base class in a
>> separate file named decrypt_fun.py, and try to import the udf into my main
>> python file named udf_use_resource.py.
>>
>> However, after I `flink run`, I find the error log in TaskManager log:
>>
>> ```
>> Caused by: java.lang.RuntimeException: Error received from SDK harness
>> for instruction 1: Traceback (most recent call last):
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 376, in get
>> processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>> IndexError: pop from empty list
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 253, in _execute
>> response = task()
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 310, in <lambda>
>> lambda: self.create_worker().do_instruction(request), request)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 480, in do_instruction
>> getattr(request, request_type), request.instruction_id)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 509, in process_bundle
>> instruction_id, request.process_bundle_descriptor_id)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 382, in get
>> self.data_channel_factory)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 847, in __init__
>> self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 902, in create_execution_tree
>> descriptor.transforms, key=topological_height, reverse=True)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 901, in <listcomp>
>> (transform_id, get_operation(transform_id)) for transform_id in sorted(
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 791, in wrapper
>> result = cache[args] = func(*args)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 885, in get_operation
>> pcoll_id in descriptor.transforms[transform_id].outputs.items()
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 885, in <dictcomp>
>> pcoll_id in descriptor.transforms[transform_id].outputs.items()
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 883, in <listcomp>
>> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 791, in wrapper
>> result = cache[args] = func(*args)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 888, in get_operation
>> transform_id, transform_consumers)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 1174, in create_operation
>> return creator(self, transform_id, transform_proto, payload, consumers)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py",
>> line 39, in create_scalar_function
>> operations.ScalarFunctionOperation)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py",
>> line 166, in _create_user_defined_function_operation
>> internal_operation_cls)
>> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in
>> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
>> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in
>> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
>> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in
>> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
>> line 91, in __init__
>> super(ScalarFunctionOperation, self).__init__(spec)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
>> line 56, in __init__
>> self.func, self.user_defined_funcs =
>> self.generate_func(self.spec.serialized_fn)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
>> line 105, in generate_func
>> [operation_utils.extract_user_defined_function(udf) for udf in
>> serialized_fn.udfs])
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
>> line 105, in <listcomp>
>> [operation_utils.extract_user_defined_function(udf) for udf in
>> serialized_fn.udfs])
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py",
>> line 86, in extract_user_defined_function
>> user_defined_func = pickle.loads(user_defined_function_proto.payload)
>> File
>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py",
>> line 29, in loads
>> return cloudpickle.loads(payload)
>> ModuleNotFoundError: No module named 'decrypt_fun'
>>
>>     at
>> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at
>> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> ~[?:1.8.0_282]
>>     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> ~[?:1.8.0_282]
>>     ... 1 more
>> ```
>>
>> I wonder why? If I move the Decrypt class into udf_use_resource.py,
>> everything works just fine.
>>
>> Thank you!
>>
>> Best,
>> Yik San
>>
>>
>>
>

Re: ModuleNotFound when loading udf from another py file

Posted by Dian Fu <di...@gmail.com>.
I guess this is the magic of cloud pickle. PyFlink depends on cloud pickle to serialize the Python UDF. 

For the latter case, I guess the whole Python UDF implementation will be serialized. However, for the previous case, only the path of the class is serialized.

Regards,
Dian

> 2021年4月27日 下午8:52,Yik San Chan <ev...@gmail.com> 写道:
> 
> Hi Dian,
> 
> Thanks! Adding -pyfs definitely helps.
> 
> However, I am curious. If I define my udf this way:
> 
> ```python
> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
> def decrypt(s):
>     import pandas as pd
>     d = pd.read_csv('resources.zip/resources/crypt.csv', header=None, index_col=0, squeeze=True).to_dict()
>     return d.get(s, "unknown")
> ```
> 
> I can `flink run` without having to specify -pyfs option. The code can also be found in the commit https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607 <https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607>. I wonder why?
> 
> Best,
> Yik San
> 
> On Tue, Apr 27, 2021 at 8:13 PM Dian Fu <dian0511.fu@gmail.com <ma...@gmail.com>> wrote:
> Hi Yik San,
> 
> From the exception message, it’s clear that it could not find module `decrypt_fun` during execution.
> 
> You need to specify file `decrypt_fun.py` as a dependency during submitting the job, e.g. via -pyfs command line arguments. Otherwise, this file will not be available during execution.
> 
> Regards,
> Dian
> 
>> 2021年4月27日 下午8:01,Yik San Chan <evan.chanyiksan@gmail.com <ma...@gmail.com>> 写道:
>> 
>> Hi,
>> 
>> Here's the reproducible code sample: https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3 <https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3>
>> 
>> I implement my Python UDF by extending the ScalarFunction base class in a separate file named decrypt_fun.py, and try to import the udf into my main python file named udf_use_resource.py.
>> 
>> However, after I `flink run`, I find the error log in TaskManager log:
>> 
>> ```
>> Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get
>>     processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>> IndexError: pop from empty list
>> 
>> During handling of the above exception, another exception occurred:
>> 
>> Traceback (most recent call last):
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
>>     response = task()
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
>>     lambda: self.create_worker().do_instruction(request), request)
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
>>     getattr(request, request_type), request.instruction_id)
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle
>>     instruction_id, request.process_bundle_descriptor_id)
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get
>>     self.data_channel_factory)
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__
>>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree
>>     descriptor.transforms, key=topological_height, reverse=True)
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp>
>>     (transform_id, get_operation(transform_id)) for transform_id in sorted(
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
>>     result = cache[args] = func(*args)
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation
>>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp>
>>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp>
>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
>>     result = cache[args] = func(*args)
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation
>>     transform_id, transform_consumers)
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation
>>     return creator(self, transform_id, transform_proto, payload, consumers)
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function
>>     operations.ScalarFunctionOperation)
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation
>>     internal_operation_cls)
>>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
>>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
>>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__
>>     super(ScalarFunctionOperation, self).__init__(spec)
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__
>>     self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn)
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func
>>     [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp>
>>     [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function
>>     user_defined_func = pickle.loads(user_defined_function_proto.payload)
>>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
>>     return cloudpickle.loads(payload)
>> ModuleNotFoundError: No module named 'decrypt_fun'
>> 
>>     at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_282]
>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_282]
>>     ... 1 more
>> ```
>> 
>> I wonder why? If I move the Decrypt class into udf_use_resource.py, everything works just fine.
>> 
>> Thank you!
>> 
>> Best,
>> Yik San
> 


Re: ModuleNotFound when loading udf from another py file

Posted by Yik San Chan <ev...@gmail.com>.
Hi Dian,

Thanks! Adding -pyfs definitely helps.

However, I am curious. If I define my udf this way:

```python
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def decrypt(s):
import pandas as pd
d = pd.read_csv('resources.zip/resources/crypt.csv', header=None, index_col=
0, squeeze=True).to_dict()
return d.get(s, "unknown")
```

I can `flink run` without having to specify -pyfs option. The code can also
be found in the commit
https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607.
I wonder why?

Best,
Yik San

On Tue, Apr 27, 2021 at 8:13 PM Dian Fu <di...@gmail.com> wrote:

> Hi Yik San,
>
> From the exception message, it’s clear that it could not find module
> `decrypt_fun` during execution.
>
> You need to specify file `decrypt_fun.py` as a dependency during
> submitting the job, e.g. via -pyfs command line arguments. Otherwise, this
> file will not be available during execution.
>
> Regards,
> Dian
>
> 2021年4月27日 下午8:01,Yik San Chan <ev...@gmail.com> 写道:
>
> Hi,
>
> Here's the reproducible code sample:
> https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3
>
> I implement my Python UDF by extending the ScalarFunction base class in a
> separate file named decrypt_fun.py, and try to import the udf into my main
> python file named udf_use_resource.py.
>
> However, after I `flink run`, I find the error log in TaskManager log:
>
> ```
> Caused by: java.lang.RuntimeException: Error received from SDK harness for
> instruction 1: Traceback (most recent call last):
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 376, in get
> processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
> IndexError: pop from empty list
>
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 253, in _execute
> response = task()
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 310, in <lambda>
> lambda: self.create_worker().do_instruction(request), request)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 480, in do_instruction
> getattr(request, request_type), request.instruction_id)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 509, in process_bundle
> instruction_id, request.process_bundle_descriptor_id)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 382, in get
> self.data_channel_factory)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 847, in __init__
> self.ops = self.create_execution_tree(self.process_bundle_descriptor)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 902, in create_execution_tree
> descriptor.transforms, key=topological_height, reverse=True)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 901, in <listcomp>
> (transform_id, get_operation(transform_id)) for transform_id in sorted(
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 791, in wrapper
> result = cache[args] = func(*args)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 885, in get_operation
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 885, in <dictcomp>
> pcoll_id in descriptor.transforms[transform_id].outputs.items()
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 883, in <listcomp>
> tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 791, in wrapper
> result = cache[args] = func(*args)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 888, in get_operation
> transform_id, transform_consumers)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1174, in create_operation
> return creator(self, transform_id, transform_proto, payload, consumers)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py",
> line 39, in create_scalar_function
> operations.ScalarFunctionOperation)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py",
> line 166, in _create_user_defined_function_operation
> internal_operation_cls)
> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in
> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in
> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
> line 91, in __init__
> super(ScalarFunctionOperation, self).__init__(spec)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
> line 56, in __init__
> self.func, self.user_defined_funcs =
> self.generate_func(self.spec.serialized_fn)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
> line 105, in generate_func
> [operation_utils.extract_user_defined_function(udf) for udf in
> serialized_fn.udfs])
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
> line 105, in <listcomp>
> [operation_utils.extract_user_defined_function(udf) for udf in
> serialized_fn.udfs])
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py",
> line 86, in extract_user_defined_function
> user_defined_func = pickle.loads(user_defined_function_proto.payload)
> File
> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py",
> line 29, in loads
> return cloudpickle.loads(payload)
> ModuleNotFoundError: No module named 'decrypt_fun'
>
>     at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_282]
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_282]
>     ... 1 more
> ```
>
> I wonder why? If I move the Decrypt class into udf_use_resource.py,
> everything works just fine.
>
> Thank you!
>
> Best,
> Yik San
>
>
>

Re: ModuleNotFound when loading udf from another py file

Posted by Dian Fu <di...@gmail.com>.
Hi Yik San,

From the exception message, it’s clear that it could not find module `decrypt_fun` during execution.

You need to specify file `decrypt_fun.py` as a dependency during submitting the job, e.g. via -pyfs command line arguments. Otherwise, this file will not be available during execution.

Regards,
Dian

> 2021年4月27日 下午8:01,Yik San Chan <ev...@gmail.com> 写道:
> 
> Hi,
> 
> Here's the reproducible code sample: https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3 <https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3>
> 
> I implement my Python UDF by extending the ScalarFunction base class in a separate file named decrypt_fun.py, and try to import the udf into my main python file named udf_use_resource.py.
> 
> However, after I `flink run`, I find the error log in TaskManager log:
> 
> ```
> Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 1: Traceback (most recent call last):
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 376, in get
>     processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
> IndexError: pop from empty list
> 
> During handling of the above exception, another exception occurred:
> 
> Traceback (most recent call last):
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
>     response = task()
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 509, in process_bundle
>     instruction_id, request.process_bundle_descriptor_id)
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in get
>     self.data_channel_factory)
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 847, in __init__
>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 902, in create_execution_tree
>     descriptor.transforms, key=topological_height, reverse=True)
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 901, in <listcomp>
>     (transform_id, get_operation(transform_id)) for transform_id in sorted(
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
>     result = cache[args] = func(*args)
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in get_operation
>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 885, in <dictcomp>
>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 883, in <listcomp>
>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 791, in wrapper
>     result = cache[args] = func(*args)
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 888, in get_operation
>     transform_id, transform_consumers)
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1174, in create_operation
>     return creator(self, transform_id, transform_proto, payload, consumers)
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 39, in create_scalar_function
>     operations.ScalarFunctionOperation)
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 166, in _create_user_defined_function_operation
>     internal_operation_cls)
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 91, in __init__
>     super(ScalarFunctionOperation, self).__init__(spec)
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 56, in __init__
>     self.func, self.user_defined_funcs = self.generate_func(self.spec.serialized_fn)
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in generate_func
>     [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", line 105, in <listcomp>
>     [operation_utils.extract_user_defined_function(udf) for udf in serialized_fn.udfs])
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", line 86, in extract_user_defined_function
>     user_defined_func = pickle.loads(user_defined_function_proto.payload)
>   File "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
>     return cloudpickle.loads(payload)
> ModuleNotFoundError: No module named 'decrypt_fun'
> 
>     at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_282]
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_282]
>     ... 1 more
> ```
> 
> I wonder why? If I move the Decrypt class into udf_use_resource.py, everything works just fine.
> 
> Thank you!
> 
> Best,
> Yik San