You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Andrew Otto <ot...@wikimedia.org> on 2023/02/14 19:51:16 UTC

Reusing the same OutputTag in multiple ProcessFunctions

Hi,

I'm attempting to implement a generic error handling ProcessFunction in
pyflink.  Given a user provided function, I want to invoke that function
for each element in the DataStream, catch any errors thrown by
the function, convert those errors into events, and then emit those event
errors to a different DataStream sink.

I'm trying to do this by reusing the same OutputTag in each of my
ProcessFunctions.
However, this does not work, I believe because I am using the same
error_output_tag in two different functions, which causes it to have a
reference(?)  to _thread.Rlock, which causes the ProcessFunction instance
to be un-pickleable.

Here's a standalone example
<https://gist.github.com/ottomata/cba55f2c65cf584ffdb933410f3b4237> of the
failure using the canonical word_count example.

My question is.
1. Does Flink support re-use of the same OutputTag instance in multiple
ProcessFunctions?
2. If so, is my problem pyflink / python / pickle specific?

Thanks!
-Andrew Otto
 Wikimedia Foundation

Re: Reusing the same OutputTag in multiple ProcessFunctions

Posted by Andrew Otto <ot...@wikimedia.org>.
Wow thank you so much!  Good to know its not just me.

At the end of my day yesterday, I started sniffing this out too.  I think I
effectively did the same thing as setting _j_typeinfo to None by manually
recreating the _j_typeinfo in a new ('cloned') output tag:

from pyflink.common.typeinfo import TypeInformation, _from_java_type
from pyflink.datastream import OutputTag

def clone_type_info(type_info: TypeInformation) -> TypeInformation:
    return _from_java_type(type_info.get_java_type_info())

def clone_output_tag(tag: OutputTag) -> OutputTag:
    return OutputTag(tag.tag_id, clone_type_info(tag.type_info))

Then, every time I need to use an OutputTag (or any function that will
enclos a _j_type_info) I make sure that that object is 'cloned'.

Thanks so much for the bugfiix!  Looking forward to it!


On Wed, Feb 15, 2023 at 4:41 AM Juntao Hu <ma...@gmail.com> wrote:

> Hi Andrew,
>
> I've found out that this's a bug brought by another bugfix FLINK-29681
> <https://issues.apache.org/jira/browse/FLINK-29681>, I've created an
> issue FLINK-31083 <https://issues.apache.org/jira/browse/FLINK-31083> for
> this problem. You could temporarily set inner java type_info to None before
> reusing the ProcessFunction to work around in your code, e.g.
> ```python
> side_output_ds1 = processed_ds1.get_side_output(output_tag1)
> output_tag1.type_info._j_typeinfo = None
> processed_ds2 = processed_ds1.process(LinesWithAndToSideOutput())
> ```
>
> Thanks for reporting!
>
> David Anderson <da...@apache.org> 于2023年2月15日周三 14:03写道:
>
>> I can't respond to the python-specific aspects of this situation, but
>> I don't believe you need to use the same OutputTag instance. It should
>> be enough that the various tag instances involved all have the same
>> String id. (That's why the id exists.)
>>
>> David
>>
>> On Tue, Feb 14, 2023 at 11:51 AM Andrew Otto <ot...@wikimedia.org> wrote:
>> >
>> > Hi,
>> >
>> > I'm attempting to implement a generic error handling ProcessFunction in
>> pyflink.  Given a user provided function, I want to invoke that function
>> for each element in the DataStream, catch any errors thrown by the
>> function, convert those errors into events, and then emit those event
>> errors to a different DataStream sink.
>> >
>> > I'm trying to do this by reusing the same OutputTag in each of my
>> ProcessFunctions.
>> > However, this does not work, I believe because I am using the same
>> error_output_tag in two different functions, which causes it to have a
>> reference(?)  to _thread.Rlock, which causes the ProcessFunction instance
>> to be un-pickleable.
>> >
>> > Here's a standalone example of the failure using the canonical
>> word_count example.
>> >
>> > My question is.
>> > 1. Does Flink support re-use of the same OutputTag instance in multiple
>> ProcessFunctions?
>> > 2. If so, is my problem pyflink / python / pickle specific?
>> >
>> > Thanks!
>> > -Andrew Otto
>> >  Wikimedia Foundation
>> >
>> >
>>
>

Re: Reusing the same OutputTag in multiple ProcessFunctions

Posted by Juntao Hu <ma...@gmail.com>.
Hi Andrew,

I've found out that this's a bug brought by another bugfix FLINK-29681
<https://issues.apache.org/jira/browse/FLINK-29681>, I've created an issue
FLINK-31083 <https://issues.apache.org/jira/browse/FLINK-31083> for this
problem. You could temporarily set inner java type_info to None before
reusing the ProcessFunction to work around in your code, e.g.
```python
side_output_ds1 = processed_ds1.get_side_output(output_tag1)
output_tag1.type_info._j_typeinfo = None
processed_ds2 = processed_ds1.process(LinesWithAndToSideOutput())
```

Thanks for reporting!

David Anderson <da...@apache.org> 于2023年2月15日周三 14:03写道:

> I can't respond to the python-specific aspects of this situation, but
> I don't believe you need to use the same OutputTag instance. It should
> be enough that the various tag instances involved all have the same
> String id. (That's why the id exists.)
>
> David
>
> On Tue, Feb 14, 2023 at 11:51 AM Andrew Otto <ot...@wikimedia.org> wrote:
> >
> > Hi,
> >
> > I'm attempting to implement a generic error handling ProcessFunction in
> pyflink.  Given a user provided function, I want to invoke that function
> for each element in the DataStream, catch any errors thrown by the
> function, convert those errors into events, and then emit those event
> errors to a different DataStream sink.
> >
> > I'm trying to do this by reusing the same OutputTag in each of my
> ProcessFunctions.
> > However, this does not work, I believe because I am using the same
> error_output_tag in two different functions, which causes it to have a
> reference(?)  to _thread.Rlock, which causes the ProcessFunction instance
> to be un-pickleable.
> >
> > Here's a standalone example of the failure using the canonical
> word_count example.
> >
> > My question is.
> > 1. Does Flink support re-use of the same OutputTag instance in multiple
> ProcessFunctions?
> > 2. If so, is my problem pyflink / python / pickle specific?
> >
> > Thanks!
> > -Andrew Otto
> >  Wikimedia Foundation
> >
> >
>

Re: Reusing the same OutputTag in multiple ProcessFunctions

Posted by David Anderson <da...@apache.org>.
I can't respond to the python-specific aspects of this situation, but
I don't believe you need to use the same OutputTag instance. It should
be enough that the various tag instances involved all have the same
String id. (That's why the id exists.)

David

On Tue, Feb 14, 2023 at 11:51 AM Andrew Otto <ot...@wikimedia.org> wrote:
>
> Hi,
>
> I'm attempting to implement a generic error handling ProcessFunction in pyflink.  Given a user provided function, I want to invoke that function for each element in the DataStream, catch any errors thrown by the function, convert those errors into events, and then emit those event errors to a different DataStream sink.
>
> I'm trying to do this by reusing the same OutputTag in each of my ProcessFunctions.
> However, this does not work, I believe because I am using the same error_output_tag in two different functions, which causes it to have a reference(?)  to _thread.Rlock, which causes the ProcessFunction instance to be un-pickleable.
>
> Here's a standalone example of the failure using the canonical word_count example.
>
> My question is.
> 1. Does Flink support re-use of the same OutputTag instance in multiple ProcessFunctions?
> 2. If so, is my problem pyflink / python / pickle specific?
>
> Thanks!
> -Andrew Otto
>  Wikimedia Foundation
>
>