You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Juho Autio <ju...@rovio.com> on 2018/01/12 08:05:07 UTC

SideOutput doesn't receive anything if filter is applied after the process function

When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed. If I
switch the position of .process() & .filter() (ie. filter first, then
process), both "a" & "b" are printed, as expected.

I guess it's a bit hard to say what the side output should include in this
case: the stream before filtering or after it?

What I would suggest is Flink to protect against this kind of a user error
that is hard to debug. Would it be possible that Flink throws an exception
if one tries to call .getSideOutput() on anything that doesn't actually
provide that side output? Now that I think of it this seems like a bug to
me: why does the call to getSideOutput succeed if it doesn't provide _any_
input? I would expect it to get the side output data stream _before_
applying .filter().

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class SideOutputProblem {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = env.fromElements("a", "b");
        OutputTag<String> sideOutputTag = new
OutputTag<String>("side-output"){};

        SingleOutputStreamOperator<String> processed = stream

                .process(new ProcessFunction<String, String>() {
                    @Override
                    public void processElement(String s, Context context,
Collector<String> collector) throws Exception {
                        if ("a".equals(s)) {
                            collector.collect(s);
                        } else {
                            context.output(sideOutputTag, s);
                        }
                    }
                })

                .filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String s) throws Exception {
                        return true;
                    }
                });

        processed.getSideOutput(sideOutputTag).printToErr();

        processed.print();

        env.execute();
    }

}

Cheers,
Juho

Re: SideOutput doesn't receive anything if filter is applied after the process function

Posted by Chen Qin <qi...@gmail.com>.
Thanks Chesnay,

So I think to support multi input and multiple output model like data flow paper indicates, Flink needs to get credit based scheduling as well as side input ready and doing a new set of data stream apis that doesn’t constrained with backwards compatibility issues. Only then can we pass through side outputs to next operator and consumer can decide what to do with it.

Yes, it might be too far to reach but that seems the one of directions community can consider.

Chen


> On Jan 16, 2018, at 5:18 AM, Chesnay Schepler <ch...@apache.org> wrote:
> 
> I've opened https://issues.apache.org/jira/browse/FLINK-8437
> 
> Unfortunately i doubt we can fix this properly. The proposed solution will not work if we ever allow arbitrary functions to use side-outputs.
> 
>> On 16.01.2018 08:59, Juho Autio wrote:
>> Could someone with knowledge of the right terms create this in JIRA, please? I guess I could also create it if needed..
>> 
>>> On Mon, Jan 15, 2018 at 3:15 PM, Chesnay Schepler <ch...@apache.org> wrote:
>>> yes, i meant that process() returns the special operator. This would definitely deserve a JIRA issue.
>>> 
>>> 
>>> On 15.01.2018 14:09, Juho Autio wrote:
>>>> Thanks for the explanation. Did you meant that process() would return a SingleOutputWithSideOutputOperator?
>>>> 
>>>> Any way, that should be enough to avoid the problem that I hit (and it also seems like the best & only way).
>>>> 
>>>> Maybe the name should be something more                             generic though, like ProcessedSingleOutputOperator or something, I wouldn't know..
>>>> 
>>>> Would this deserve an improvement ticket in JIRA?
>>>> 
>>>> On Mon, Jan 15, 2018 at 12:43 PM, Chesnay Schepler <ch...@apache.org> wrote:
>>>>> It would mean that getSideOutput() would return a SingleOutputWithSideOutputOperator which extends SingleOutputOperator offering getSideOutput(). Other transformations would still return a SingleOutputOperator.
>>>>> 
>>>>> With this the following code wouldn't compile.
>>>>> 
>>>>> stream
>>>>>     .process(...)
>>>>>     .filter(...)
>>>>>     .getSideOutput(...) // compile error
>>>>> 
>>>>> You would have to explicitly define the code as below, which makes the behavior unambiguous:
>>>>> 
>>>>> processed = stream
>>>>>     .process(...)
>>>>> 
>>>>> filtered = processed
>>>>>     .filter(...)
>>>>> 
>>>>> filteredSideOutput = processed
>>>>>     .getSideOutput(...)
>>>>>     .filter(...)
>>>>> 
>>>>> 
>>>>> On 15.01.2018 09:55, Juho Autio wrote:
>>>>>> > sideoutput might deserve a seperate class which inherit form singleoutput. It might prevent lot of confusions
>>>>>> 
>>>>>> Thanks, but how could that be done? Do you mean that if one calls .process(), then the stream would change to another class                                               which would only allow calls like .getMainOutput() or .getSideOutput("name")? Of course compile time error would be even better than a runtime error, but I don't see yet how it could be done in practice.
>>>>>> 
>>>>>>> On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin <qi...@gmail.com> wrote:
>>>>>>> Hi Juho,
>>>>>>> 
>>>>>>> I think sideoutput might deserve a seperate class which inherit form singleoutput. It might prevent lot of confusions. A more generic question is whether datastream api can be mulitple ins and mulitple outs natively. It's more like scheduling problem when you come from single process system to multiple process system, which one should get resource and how much sharing same hardware resources, I guess it will open gate to lots of edge cases -> strategies-> more edge cases :)
>>>>>>> 
>>>>>>> Chen
>>>>>>> 
>>>>>>>> On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio <ju...@rovio.com> wrote:
>>>>>>>> Maybe I could express it in a slightly different way: if adding the .filter() after .process() causes the side output to be somehow totally "lost", then I believe the                                                           .getSideOutput() could be aware that there is not such side output to be listened to from upstream, and throw an exception. I mean, this should be possible when building the DAG, it shouldn't require starting the stream to detect? Thanks..
>>>>>>>> 
>>>>>>>> On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
>>>>>>>>> Hi Juho,
>>>>>>>>> 
>>>>>>>>>> Now that I think of it this seems like a bug to me: why does the call to getSideOutput succeed if it doesn't provide _any_ input?
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> With the way side outputs work, I don’t think this is possible (or would make sense). An operator does not know whether or not it would ever emit some element with a given tag.
>>>>>>>>> As far as I understand it, calling `getSideOutput` essentially adds a virtual node to the result stream graph that listens to the specified tag from the upstream input.
>>>>>>>>> 
>>>>>>>>> While I’m not aware whether or not your                                                           observation is expected behavior, from an API perspective, I can see why it is a bit confusing for you.
>>>>>>>>> Aljoscha would be the expert here, maybe he’ll have more insights. I’ve looped him in cc’ed.
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Gordon
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> On 12 January 2018 at 4:05:13 PM, Juho Autio (juho.autio@rovio.com) wrote:
>>>>>>>>>> 
>>>>>>>>>> When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed. If I switch the position of .process() & .filter() (ie. filter first, then process), both "a" & "b" are printed, as expected.
>>>>>>>>>> 
>>>>>>>>>> I guess it's a bit hard to say what the side output should include in this case: the stream before filtering or after it?
>>>>>>>>>> 
>>>>>>>>>> What I would suggest is Flink to protect against this kind of a user error that is hard to debug. Would it be possible that Flink throws an exception                                                           if one tries to call .getSideOutput() on anything that doesn't actually provide that side output? Now that I think of it this seems like a bug to me: why does the call to getSideOutput succeed if it doesn't provide _any_ input? I would expect it to get the side output data stream _before_ applying .filter().
>>>>>>>>>> 
>>>>>>>>>> import org.apache.flink.api.common.functions.FilterFunction;
>>>>>>>>>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>>>>>>>>>> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>>>>>>>>>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>>>>>>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>>>>>>>>>> import org.apache.flink.util.Collector;
>>>>>>>>>> import org.apache.flink.util.OutputTag;
>>>>>>>>>> 
>>>>>>>>>> public class SideOutputProblem {
>>>>>>>>>> 
>>>>>>>>>>     public static void main(String[] args) throws Exception {
>>>>>>>>>> 
>>>>>>>>>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>         DataStreamSource<String> stream = env.fromElements("a", "b");
>>>>>>>>>>         OutputTag<String> sideOutputTag = new OutputTag<String>("side-output"){};
>>>>>>>>>> 
>>>>>>>>>>         SingleOutputStreamOperator<String> processed = stream
>>>>>>>>>> 
>>>>>>>>>>                 .process(new ProcessFunction<String, String>() {
>>>>>>>>>>                     @Override
>>>>>>>>>>                     public void processElement(String s, Context context, Collector<String> collector) throws Exception {
>>>>>>>>>>                         if ("a".equals(s)) {
>>>>>>>>>>                             collector.collect(s);
>>>>>>>>>>                         } else {
>>>>>>>>>>                             context.output(sideOutputTag, s);
>>>>>>>>>>                         }
>>>>>>>>>>                     }
>>>>>>>>>>                 })
>>>>>>>>>> 
>>>>>>>>>>                 .filter(new FilterFunction<String>() {
>>>>>>>>>>                     @Override
>>>>>>>>>>                     public boolean filter(String s) throws Exception {
>>>>>>>>>>                         return true;
>>>>>>>>>>                     }
>>>>>>>>>>                 });
>>>>>>>>>> 
>>>>>>>>>>         processed.getSideOutput(sideOutputTag).printToErr();
>>>>>>>>>> 
>>>>>>>>>>         processed.print();
>>>>>>>>>> 
>>>>>>>>>>         env.execute();
>>>>>>>>>>     }
>>>>>>>>>> 
>>>>>>>>>> }
>>>>>>>>>> 
>>>>>>>>>> Cheers,
>>>>>>>>>> Juho
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>> 
> 

Re: SideOutput doesn't receive anything if filter is applied after the process function

Posted by Chesnay Schepler <ch...@apache.org>.
I've opened https://issues.apache.org/jira/browse/FLINK-8437

Unfortunately i doubt we can fix this properly. The proposed solution 
will not work if we ever allow arbitrary functions to use side-outputs.

On 16.01.2018 08:59, Juho Autio wrote:
> Could someone with knowledge of the right terms create this in JIRA, 
> please? I guess I could also create it if needed..
>
> On Mon, Jan 15, 2018 at 3:15 PM, Chesnay Schepler <chesnay@apache.org 
> <ma...@apache.org>> wrote:
>
>     yes, i meant that process() returns the special operator. This
>     would definitely deserve a JIRA issue.
>
>
>     On 15.01.2018 14:09, Juho Autio wrote:
>>     Thanks for the explanation. Did you meant that process() would
>>     return a SingleOutputWithSideOutputOperator?
>>
>>     Any way, that should be enough to avoid the problem that I hit
>>     (and it also seems like the best & only way).
>>
>>     Maybe the name should be something more generic though, like
>>     ProcessedSingleOutputOperator or something, I wouldn't know..
>>
>>     Would this deserve an improvement ticket in JIRA?
>>
>>     On Mon, Jan 15, 2018 at 12:43 PM, Chesnay Schepler
>>     <chesnay@apache.org <ma...@apache.org>> wrote:
>>
>>         It would mean that getSideOutput() would return a
>>         SingleOutputWithSideOutputOperator which extends
>>         SingleOutputOperator offering getSideOutput(). Other
>>         transformations would still return a SingleOutputOperator.
>>
>>         With this the following code wouldn't compile.
>>
>>         stream
>>             .process(...)
>>             .filter(...)
>>             .getSideOutput(...) // compile error
>>
>>         You would have to explicitly define the code as below, which
>>         makes the behavior unambiguous:
>>
>>         processed = stream
>>             .process(...)
>>
>>         filtered = processed
>>             .filter(...)
>>
>>         filteredSideOutput = processed
>>             .getSideOutput(...)
>>             .filter(...)
>>
>>
>>         On 15.01.2018 09:55, Juho Autio wrote:
>>>         > sideoutput might deserve a seperate class which inherit
>>>         form singleoutput. It might prevent lot of confusions
>>>
>>>         Thanks, but how could that be done? Do you mean that if one
>>>         calls .process(), then the stream would change to another
>>>         class which would only allow calls like .getMainOutput() or
>>>         .getSideOutput("name")? Of course compile time error would
>>>         be even better than a runtime error, but I don't see yet how
>>>         it could be done in practice.
>>>
>>>         On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin
>>>         <qinnchen@gmail.com <ma...@gmail.com>> wrote:
>>>
>>>             Hi Juho,
>>>
>>>             I think sideoutput might deserve a seperate class which
>>>             inherit form singleoutput. It might prevent lot of
>>>             confusions. A more generic question is whether
>>>             datastream api can be mulitple ins and mulitple outs
>>>             natively. It's more like scheduling problem when you
>>>             come from single process system to multiple process
>>>             system, which one should get resource and how much
>>>             sharing same hardware resources, I guess it will open
>>>             gate to lots of edge cases -> strategies-> more edge
>>>             cases :)
>>>
>>>             Chen
>>>
>>>             On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio
>>>             <juho.autio@rovio.com <ma...@rovio.com>> wrote:
>>>
>>>                 Maybe I could express it in a slightly different
>>>                 way: if adding the .filter() after .process() causes
>>>                 the side output to be somehow totally "lost", then I
>>>                 believe the .getSideOutput() could be aware that
>>>                 there is not such side output to be listened to from
>>>                 upstream, and throw an exception. I mean, this
>>>                 should be possible when building the DAG, it
>>>                 shouldn't require starting the stream to detect?
>>>                 Thanks..
>>>
>>>                 On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai
>>>                 <tzulitai@apache.org <ma...@apache.org>>
>>>                 wrote:
>>>
>>>                     Hi Juho,
>>>
>>>>                     Now that I think of it this seems like a bug to
>>>>                     me: why does the call to getSideOutput succeed
>>>>                     if it doesn't provide _any_ input?
>>>
>>>                     With the way side outputs work, I don’t think
>>>                     this is possible (or would make sense). An
>>>                     operator does not know whether or not it would
>>>                     ever emit some element with a given tag.
>>>                     As far as I understand it, calling
>>>                     `getSideOutput` essentially adds a virtual node
>>>                     to the result stream graph that listens to the
>>>                     specified tag from the upstream input.
>>>
>>>                     While I’m not aware whether or not your
>>>                     observation is expected behavior, from an API
>>>                     perspective, I can see why it is a bit confusing
>>>                     for you.
>>>                     Aljoscha would be the expert here, maybe he’ll
>>>                     have more insights. I’ve looped him in cc’ed.
>>>
>>>                     Cheers,
>>>                     Gordon
>>>
>>>
>>>                     On 12 January 2018 at 4:05:13 PM, Juho Autio
>>>                     (juho.autio@rovio.com
>>>                     <ma...@rovio.com>) wrote:
>>>
>>>>                     When I run the code below (Flink 1.4.0 or
>>>>                     1.3.1), only "a" is printed. If I switch the
>>>>                     position of .process() & .filter() (ie. filter
>>>>                     first, then process), both "a" & "b" are
>>>>                     printed, as expected.
>>>>
>>>>                     I guess it's a bit hard to say what the side
>>>>                     output should include in this case: the stream
>>>>                     before filtering or after it?
>>>>
>>>>                     What I would suggest is Flink to protect
>>>>                     against this kind of a user error that is hard
>>>>                     to debug. Would it be possible that Flink
>>>>                     throws an exception if one tries to call
>>>>                     .getSideOutput() on anything that doesn't
>>>>                     actually provide that side output? Now that I
>>>>                     think of it this seems like a bug to me: why
>>>>                     does the call to getSideOutput succeed if it
>>>>                     doesn't provide _any_ input? I would expect it
>>>>                     to get the side output data stream _before_
>>>>                     applying .filter().
>>>>
>>>>                     import
>>>>                     org.apache.flink.api.common.functions.FilterFunction;
>>>>                     import
>>>>                     org.apache.flink.streaming.api.datastream.DataStreamSource;
>>>>                     import
>>>>                     org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>>>>                     import
>>>>                     org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>                     import
>>>>                     org.apache.flink.streaming.api.functions.ProcessFunction;
>>>>                     import org.apache.flink.util.Collector;
>>>>                     import org.apache.flink.util.OutputTag;
>>>>
>>>>                     public class SideOutputProblem {
>>>>
>>>>                     public static void main(String[] args) throws
>>>>                     Exception {
>>>>
>>>>                     StreamExecutionEnvironment env =
>>>>                     StreamExecutionEnvironment.getExecutionEnvironment();
>>>>                     DataStreamSource<String> stream =
>>>>                     env.fromElements("a", "b");
>>>>                     OutputTag<String> sideOutputTag = new
>>>>                     OutputTag<String>("side-output"){};
>>>>
>>>>                     SingleOutputStreamOperator<String> processed =
>>>>                     stream
>>>>
>>>>                     .process(new ProcessFunction<String, String>() {
>>>>                     @Override
>>>>                     public void processElement(String s, Context
>>>>                     context, Collector<String> collector) throws
>>>>                     Exception {
>>>>                     if ("a".equals(s)) {
>>>>                     collector.collect(s);
>>>>                     } else {
>>>>                     context.output(sideOutputTag, s);
>>>>                     }
>>>>                               }
>>>>                           })
>>>>
>>>>                     .filter(new FilterFunction<String>() {
>>>>                     @Override
>>>>                     public boolean filter(String s) throws Exception {
>>>>                     return true;
>>>>                               }
>>>>                           });
>>>>
>>>>                     processed.getSideOutput(sideOutputTag).printToErr();
>>>>
>>>>                     processed.print();
>>>>
>>>>                     env.execute();
>>>>                         }
>>>>
>>>>                     }
>>>>
>>>>                     Cheers,
>>>>                     Juho
>>>
>>>
>>>
>>>
>>
>


Re: SideOutput doesn't receive anything if filter is applied after the process function

Posted by Juho Autio <ju...@rovio.com>.
Could someone with knowledge of the right terms create this in JIRA,
please? I guess I could also create it if needed..

On Mon, Jan 15, 2018 at 3:15 PM, Chesnay Schepler <ch...@apache.org>
wrote:

> yes, i meant that process() returns the special operator. This would
> definitely deserve a JIRA issue.
>
>
> On 15.01.2018 14:09, Juho Autio wrote:
>
> Thanks for the explanation. Did you meant that process() would return a
> SingleOutputWithSideOutputOperator?
>
> Any way, that should be enough to avoid the problem that I hit (and it
> also seems like the best & only way).
>
> Maybe the name should be something more generic though, like
> ProcessedSingleOutputOperator or something, I wouldn't know..
>
> Would this deserve an improvement ticket in JIRA?
>
> On Mon, Jan 15, 2018 at 12:43 PM, Chesnay Schepler <ch...@apache.org>
> wrote:
>
>> It would mean that getSideOutput() would return a
>> SingleOutputWithSideOutputOperator which extends SingleOutputOperator
>> offering getSideOutput(). Other transformations would still return a
>> SingleOutputOperator.
>>
>> With this the following code wouldn't compile.
>>
>> stream
>>     .process(...)
>>     .filter(...)
>>     .getSideOutput(...) // compile error
>>
>> You would have to explicitly define the code as below, which makes the
>> behavior unambiguous:
>>
>> processed = stream
>>     .process(...)
>>
>> filtered = processed
>>     .filter(...)
>>
>> filteredSideOutput = processed
>>     .getSideOutput(...)
>>     .filter(...)
>>
>>
>> On 15.01.2018 09:55, Juho Autio wrote:
>>
>> > sideoutput might deserve a seperate class which inherit form
>> singleoutput. It might prevent lot of confusions
>>
>> Thanks, but how could that be done? Do you mean that if one calls
>> .process(), then the stream would change to another class which would only
>> allow calls like .getMainOutput() or .getSideOutput("name")? Of course
>> compile time error would be even better than a runtime error, but I don't
>> see yet how it could be done in practice.
>>
>> On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin <qi...@gmail.com> wrote:
>>
>>> Hi Juho,
>>>
>>> I think sideoutput might deserve a seperate class which inherit form
>>> singleoutput. It might prevent lot of confusions. A more generic question
>>> is whether datastream api can be mulitple ins and mulitple outs natively.
>>> It's more like scheduling problem when you come from single process system
>>> to multiple process system, which one should get resource and how much
>>> sharing same hardware resources, I guess it will open gate to lots of edge
>>> cases -> strategies-> more edge cases :)
>>>
>>> Chen
>>>
>>> On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio <ju...@rovio.com>
>>> wrote:
>>>
>>>> Maybe I could express it in a slightly different way: if adding the
>>>> .filter() after .process() causes the side output to be somehow totally
>>>> "lost", then I believe the .getSideOutput() could be aware that there is
>>>> not such side output to be listened to from upstream, and throw an
>>>> exception. I mean, this should be possible when building the DAG, it
>>>> shouldn't require starting the stream to detect? Thanks..
>>>>
>>>> On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai <
>>>> tzulitai@apache.org> wrote:
>>>>
>>>>> Hi Juho,
>>>>>
>>>>> Now that I think of it this seems like a bug to me: why does the call
>>>>> to getSideOutput succeed if it doesn't provide _any_ input?
>>>>>
>>>>>
>>>>> With the way side outputs work, I don’t think this is possible (or
>>>>> would make sense). An operator does not know whether or not it would ever
>>>>> emit some element with a given tag.
>>>>> As far as I understand it, calling `getSideOutput` essentially adds a
>>>>> virtual node to the result stream graph that listens to the specified tag
>>>>> from the upstream input.
>>>>>
>>>>> While I’m not aware whether or not your observation is expected
>>>>> behavior, from an API perspective, I can see why it is a bit confusing for
>>>>> you.
>>>>> Aljoscha would be the expert here, maybe he’ll have more insights.
>>>>> I’ve looped him in cc’ed.
>>>>>
>>>>> Cheers,
>>>>> Gordon
>>>>>
>>>>>
>>>>> On 12 January 2018 at 4:05:13 PM, Juho Autio (juho.autio@rovio.com)
>>>>> wrote:
>>>>>
>>>>> When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed.
>>>>> If I switch the position of .process() & .filter() (ie. filter first, then
>>>>> process), both "a" & "b" are printed, as expected.
>>>>>
>>>>> I guess it's a bit hard to say what the side output should include in
>>>>> this case: the stream before filtering or after it?
>>>>>
>>>>> What I would suggest is Flink to protect against this kind of a user
>>>>> error that is hard to debug. Would it be possible that Flink throws an
>>>>> exception if one tries to call .getSideOutput() on anything that doesn't
>>>>> actually provide that side output? Now that I think of it this seems like a
>>>>> bug to me: why does the call to getSideOutput succeed if it doesn't provide
>>>>> _any_ input? I would expect it to get the side output data stream _before_
>>>>> applying .filter().
>>>>>
>>>>> import org.apache.flink.api.common.functions.FilterFunction;
>>>>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>>>>> import org.apache.flink.streaming.api.datastream.SingleOutputStream
>>>>> Operator;
>>>>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>>>>> vironment;
>>>>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>>>>> import org.apache.flink.util.Collector;
>>>>> import org.apache.flink.util.OutputTag;
>>>>>
>>>>> public class SideOutputProblem {
>>>>>
>>>>>     public static void main(String[] args) throws Exception {
>>>>>
>>>>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>>>> ExecutionEnvironment();
>>>>>         DataStreamSource<String> stream = env.fromElements("a", "b");
>>>>>         OutputTag<String> sideOutputTag = new
>>>>> OutputTag<String>("side-output"){};
>>>>>
>>>>>         SingleOutputStreamOperator<String> processed = stream
>>>>>
>>>>>                 .process(new ProcessFunction<String, String>() {
>>>>>                     @Override
>>>>>                     public void processElement(String s, Context
>>>>> context, Collector<String> collector) throws Exception {
>>>>>                         if ("a".equals(s)) {
>>>>>                             collector.collect(s);
>>>>>                         } else {
>>>>>                             context.output(sideOutputTag, s);
>>>>>                         }
>>>>>                     }
>>>>>                 })
>>>>>
>>>>>                 .filter(new FilterFunction<String>() {
>>>>>                     @Override
>>>>>                     public boolean filter(String s) throws Exception {
>>>>>                         return true;
>>>>>                     }
>>>>>                 });
>>>>>
>>>>>         processed.getSideOutput(sideOutputTag).printToErr();
>>>>>
>>>>>         processed.print();
>>>>>
>>>>>         env.execute();
>>>>>     }
>>>>>
>>>>> }
>>>>>
>>>>> Cheers,
>>>>> Juho
>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Re: SideOutput doesn't receive anything if filter is applied after the process function

Posted by Chesnay Schepler <ch...@apache.org>.
yes, i meant that process() returns the special operator. This would 
definitely deserve a JIRA issue.

On 15.01.2018 14:09, Juho Autio wrote:
> Thanks for the explanation. Did you meant that process() would return 
> a SingleOutputWithSideOutputOperator?
>
> Any way, that should be enough to avoid the problem that I hit (and it 
> also seems like the best & only way).
>
> Maybe the name should be something more generic though, like 
> ProcessedSingleOutputOperator or something, I wouldn't know..
>
> Would this deserve an improvement ticket in JIRA?
>
> On Mon, Jan 15, 2018 at 12:43 PM, Chesnay Schepler <chesnay@apache.org 
> <ma...@apache.org>> wrote:
>
>     It would mean that getSideOutput() would return a
>     SingleOutputWithSideOutputOperator which extends
>     SingleOutputOperator offering getSideOutput(). Other
>     transformations would still return a SingleOutputOperator.
>
>     With this the following code wouldn't compile.
>
>     stream
>         .process(...)
>         .filter(...)
>         .getSideOutput(...) // compile error
>
>     You would have to explicitly define the code as below, which makes
>     the behavior unambiguous:
>
>     processed = stream
>         .process(...)
>
>     filtered = processed
>         .filter(...)
>
>     filteredSideOutput = processed
>         .getSideOutput(...)
>         .filter(...)
>
>
>     On 15.01.2018 09:55, Juho Autio wrote:
>>     > sideoutput might deserve a seperate class which inherit form
>>     singleoutput. It might prevent lot of confusions
>>
>>     Thanks, but how could that be done? Do you mean that if one calls
>>     .process(), then the stream would change to another class which
>>     would only allow calls like .getMainOutput() or
>>     .getSideOutput("name")? Of course compile time error would be
>>     even better than a runtime error, but I don't see yet how it
>>     could be done in practice.
>>
>>     On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin <qinnchen@gmail.com
>>     <ma...@gmail.com>> wrote:
>>
>>         Hi Juho,
>>
>>         I think sideoutput might deserve a seperate class which
>>         inherit form singleoutput. It might prevent lot of
>>         confusions. A more generic question is whether datastream api
>>         can be mulitple ins and mulitple outs natively. It's more
>>         like scheduling problem when you come from single process
>>         system to multiple process system, which one should get
>>         resource and how much sharing same hardware resources, I
>>         guess it will open gate to lots of edge cases -> strategies->
>>         more edge cases :)
>>
>>         Chen
>>
>>         On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio
>>         <juho.autio@rovio.com <ma...@rovio.com>> wrote:
>>
>>             Maybe I could express it in a slightly different way: if
>>             adding the .filter() after .process() causes the side
>>             output to be somehow totally "lost", then I believe the
>>             .getSideOutput() could be aware that there is not such
>>             side output to be listened to from upstream, and throw an
>>             exception. I mean, this should be possible when building
>>             the DAG, it shouldn't require starting the stream to
>>             detect? Thanks..
>>
>>             On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai
>>             <tzulitai@apache.org <ma...@apache.org>> wrote:
>>
>>                 Hi Juho,
>>
>>>                 Now that I think of it this seems like a bug to me:
>>>                 why does the call to getSideOutput succeed if it
>>>                 doesn't provide _any_ input?
>>
>>                 With the way side outputs work, I don’t think this is
>>                 possible (or would make sense). An operator does not
>>                 know whether or not it would ever emit some element
>>                 with a given tag.
>>                 As far as I understand it, calling `getSideOutput`
>>                 essentially adds a virtual node to the result stream
>>                 graph that listens to the specified tag from the
>>                 upstream input.
>>
>>                 While I’m not aware whether or not your observation
>>                 is expected behavior, from an API perspective, I can
>>                 see why it is a bit confusing for you.
>>                 Aljoscha would be the expert here, maybe he’ll have
>>                 more insights. I’ve looped him in cc’ed.
>>
>>                 Cheers,
>>                 Gordon
>>
>>
>>                 On 12 January 2018 at 4:05:13 PM, Juho Autio
>>                 (juho.autio@rovio.com <ma...@rovio.com>)
>>                 wrote:
>>
>>>                 When I run the code below (Flink 1.4.0 or 1.3.1),
>>>                 only "a" is printed. If I switch the position of
>>>                 .process() & .filter() (ie. filter first, then
>>>                 process), both "a" & "b" are printed, as expected.
>>>
>>>                 I guess it's a bit hard to say what the side output
>>>                 should include in this case: the stream before
>>>                 filtering or after it?
>>>
>>>                 What I would suggest is Flink to protect against
>>>                 this kind of a user error that is hard to debug.
>>>                 Would it be possible that Flink throws an exception
>>>                 if one tries to call .getSideOutput() on anything
>>>                 that doesn't actually provide that side output? Now
>>>                 that I think of it this seems like a bug to me: why
>>>                 does the call to getSideOutput succeed if it doesn't
>>>                 provide _any_ input? I would expect it to get the
>>>                 side output data stream _before_ applying .filter().
>>>
>>>                 import
>>>                 org.apache.flink.api.common.functions.FilterFunction;
>>>                 import
>>>                 org.apache.flink.streaming.api.datastream.DataStreamSource;
>>>                 import
>>>                 org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>>>                 import
>>>                 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>                 import
>>>                 org.apache.flink.streaming.api.functions.ProcessFunction;
>>>                 import org.apache.flink.util.Collector;
>>>                 import org.apache.flink.util.OutputTag;
>>>
>>>                 public class SideOutputProblem {
>>>
>>>                 public static void main(String[] args) throws
>>>                 Exception {
>>>
>>>                 StreamExecutionEnvironment env =
>>>                 StreamExecutionEnvironment.getExecutionEnvironment();
>>>                 DataStreamSource<String> stream =
>>>                 env.fromElements("a", "b");
>>>                 OutputTag<String> sideOutputTag = new
>>>                 OutputTag<String>("side-output"){};
>>>
>>>                 SingleOutputStreamOperator<String> processed = stream
>>>
>>>                 .process(new ProcessFunction<String, String>() {
>>>                 @Override
>>>                 public void processElement(String s, Context
>>>                 context, Collector<String> collector) throws Exception {
>>>                 if ("a".equals(s)) {
>>>                 collector.collect(s);
>>>                 } else {
>>>                 context.output(sideOutputTag, s);
>>>                 }
>>>                           }
>>>                       })
>>>
>>>                 .filter(new FilterFunction<String>() {
>>>                 @Override
>>>                 public boolean filter(String s) throws Exception {
>>>                 return true;
>>>                           }
>>>                       });
>>>
>>>                 processed.getSideOutput(sideOutputTag).printToErr();
>>>
>>>                 processed.print();
>>>
>>>                 env.execute();
>>>                     }
>>>
>>>                 }
>>>
>>>                 Cheers,
>>>                 Juho
>>
>>
>>
>>
>


Re: SideOutput doesn't receive anything if filter is applied after the process function

Posted by Juho Autio <ju...@rovio.com>.
Thanks for the explanation. Did you meant that process() would return a
SingleOutputWithSideOutputOperator?

Any way, that should be enough to avoid the problem that I hit (and it also
seems like the best & only way).

Maybe the name should be something more generic though, like
ProcessedSingleOutputOperator or something, I wouldn't know..

Would this deserve an improvement ticket in JIRA?

On Mon, Jan 15, 2018 at 12:43 PM, Chesnay Schepler <ch...@apache.org>
wrote:

> It would mean that getSideOutput() would return a
> SingleOutputWithSideOutputOperator which extends SingleOutputOperator
> offering getSideOutput(). Other transformations would still return a
> SingleOutputOperator.
>
> With this the following code wouldn't compile.
>
> stream
>     .process(...)
>     .filter(...)
>     .getSideOutput(...) // compile error
>
> You would have to explicitly define the code as below, which makes the
> behavior unambiguous:
>
> processed = stream
>     .process(...)
>
> filtered = processed
>     .filter(...)
>
> filteredSideOutput = processed
>     .getSideOutput(...)
>     .filter(...)
>
>
> On 15.01.2018 09:55, Juho Autio wrote:
>
> > sideoutput might deserve a seperate class which inherit form
> singleoutput. It might prevent lot of confusions
>
> Thanks, but how could that be done? Do you mean that if one calls
> .process(), then the stream would change to another class which would only
> allow calls like .getMainOutput() or .getSideOutput("name")? Of course
> compile time error would be even better than a runtime error, but I don't
> see yet how it could be done in practice.
>
> On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin <qi...@gmail.com> wrote:
>
>> Hi Juho,
>>
>> I think sideoutput might deserve a seperate class which inherit form
>> singleoutput. It might prevent lot of confusions. A more generic question
>> is whether datastream api can be mulitple ins and mulitple outs natively.
>> It's more like scheduling problem when you come from single process system
>> to multiple process system, which one should get resource and how much
>> sharing same hardware resources, I guess it will open gate to lots of edge
>> cases -> strategies-> more edge cases :)
>>
>> Chen
>>
>> On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio <ju...@rovio.com> wrote:
>>
>>> Maybe I could express it in a slightly different way: if adding the
>>> .filter() after .process() causes the side output to be somehow totally
>>> "lost", then I believe the .getSideOutput() could be aware that there is
>>> not such side output to be listened to from upstream, and throw an
>>> exception. I mean, this should be possible when building the DAG, it
>>> shouldn't require starting the stream to detect? Thanks..
>>>
>>> On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai <
>>> tzulitai@apache.org> wrote:
>>>
>>>> Hi Juho,
>>>>
>>>> Now that I think of it this seems like a bug to me: why does the call
>>>> to getSideOutput succeed if it doesn't provide _any_ input?
>>>>
>>>>
>>>> With the way side outputs work, I don’t think this is possible (or
>>>> would make sense). An operator does not know whether or not it would ever
>>>> emit some element with a given tag.
>>>> As far as I understand it, calling `getSideOutput` essentially adds a
>>>> virtual node to the result stream graph that listens to the specified tag
>>>> from the upstream input.
>>>>
>>>> While I’m not aware whether or not your observation is expected
>>>> behavior, from an API perspective, I can see why it is a bit confusing for
>>>> you.
>>>> Aljoscha would be the expert here, maybe he’ll have more insights. I’ve
>>>> looped him in cc’ed.
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>>
>>>> On 12 January 2018 at 4:05:13 PM, Juho Autio (juho.autio@rovio.com)
>>>> wrote:
>>>>
>>>> When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed.
>>>> If I switch the position of .process() & .filter() (ie. filter first, then
>>>> process), both "a" & "b" are printed, as expected.
>>>>
>>>> I guess it's a bit hard to say what the side output should include in
>>>> this case: the stream before filtering or after it?
>>>>
>>>> What I would suggest is Flink to protect against this kind of a user
>>>> error that is hard to debug. Would it be possible that Flink throws an
>>>> exception if one tries to call .getSideOutput() on anything that doesn't
>>>> actually provide that side output? Now that I think of it this seems like a
>>>> bug to me: why does the call to getSideOutput succeed if it doesn't provide
>>>> _any_ input? I would expect it to get the side output data stream _before_
>>>> applying .filter().
>>>>
>>>> import org.apache.flink.api.common.functions.FilterFunction;
>>>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>>>> import org.apache.flink.streaming.api.datastream.SingleOutputStream
>>>> Operator;
>>>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>>>> vironment;
>>>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>>>> import org.apache.flink.util.Collector;
>>>> import org.apache.flink.util.OutputTag;
>>>>
>>>> public class SideOutputProblem {
>>>>
>>>>     public static void main(String[] args) throws Exception {
>>>>
>>>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>>> ExecutionEnvironment();
>>>>         DataStreamSource<String> stream = env.fromElements("a", "b");
>>>>         OutputTag<String> sideOutputTag = new
>>>> OutputTag<String>("side-output"){};
>>>>
>>>>         SingleOutputStreamOperator<String> processed = stream
>>>>
>>>>                 .process(new ProcessFunction<String, String>() {
>>>>                     @Override
>>>>                     public void processElement(String s, Context
>>>> context, Collector<String> collector) throws Exception {
>>>>                         if ("a".equals(s)) {
>>>>                             collector.collect(s);
>>>>                         } else {
>>>>                             context.output(sideOutputTag, s);
>>>>                         }
>>>>                     }
>>>>                 })
>>>>
>>>>                 .filter(new FilterFunction<String>() {
>>>>                     @Override
>>>>                     public boolean filter(String s) throws Exception {
>>>>                         return true;
>>>>                     }
>>>>                 });
>>>>
>>>>         processed.getSideOutput(sideOutputTag).printToErr();
>>>>
>>>>         processed.print();
>>>>
>>>>         env.execute();
>>>>     }
>>>>
>>>> }
>>>>
>>>> Cheers,
>>>> Juho
>>>>
>>>>
>>>
>>
>
>

Re: SideOutput doesn't receive anything if filter is applied after the process function

Posted by Chesnay Schepler <ch...@apache.org>.
It would mean that getSideOutput() would return a 
SingleOutputWithSideOutputOperator which extends SingleOutputOperator 
offering getSideOutput(). Other transformations would still return a 
SingleOutputOperator.

With this the following code wouldn't compile.

stream
     .process(...)
     .filter(...)
     .getSideOutput(...) // compile error

You would have to explicitly define the code as below, which makes the 
behavior unambiguous:

processed = stream
     .process(...)

filtered = processed
     .filter(...)

filteredSideOutput = processed
     .getSideOutput(...)
     .filter(...)

On 15.01.2018 09:55, Juho Autio wrote:
> > sideoutput might deserve a seperate class which inherit form 
> singleoutput. It might prevent lot of confusions
>
> Thanks, but how could that be done? Do you mean that if one calls 
> .process(), then the stream would change to another class which would 
> only allow calls like .getMainOutput() or .getSideOutput("name")? Of 
> course compile time error would be even better than a runtime error, 
> but I don't see yet how it could be done in practice.
>
> On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin <qinnchen@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Hi Juho,
>
>     I think sideoutput might deserve a seperate class which inherit
>     form singleoutput. It might prevent lot of confusions. A more
>     generic question is whether datastream api can be mulitple ins and
>     mulitple outs natively. It's more like scheduling problem when you
>     come from single process system to multiple process system, which
>     one should get resource and how much sharing same hardware
>     resources, I guess it will open gate to lots of edge cases ->
>     strategies-> more edge cases :)
>
>     Chen
>
>     On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio <juho.autio@rovio.com
>     <ma...@rovio.com>> wrote:
>
>         Maybe I could express it in a slightly different way: if
>         adding the .filter() after .process() causes the side output
>         to be somehow totally "lost", then I believe the
>         .getSideOutput() could be aware that there is not such side
>         output to be listened to from upstream, and throw an
>         exception. I mean, this should be possible when building the
>         DAG, it shouldn't require starting the stream to detect? Thanks..
>
>         On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai
>         <tzulitai@apache.org <ma...@apache.org>> wrote:
>
>             Hi Juho,
>
>>             Now that I think of it this seems like a bug to me: why
>>             does the call to getSideOutput succeed if it doesn't
>>             provide _any_ input?
>
>             With the way side outputs work, I don’t think this is
>             possible (or would make sense). An operator does not know
>             whether or not it would ever emit some element with a
>             given tag.
>             As far as I understand it, calling `getSideOutput`
>             essentially adds a virtual node to the result stream graph
>             that listens to the specified tag from the upstream input.
>
>             While I’m not aware whether or not your observation is
>             expected behavior, from an API perspective, I can see why
>             it is a bit confusing for you.
>             Aljoscha would be the expert here, maybe he’ll have more
>             insights. I’ve looped him in cc’ed.
>
>             Cheers,
>             Gordon
>
>
>             On 12 January 2018 at 4:05:13 PM, Juho Autio
>             (juho.autio@rovio.com <ma...@rovio.com>) wrote:
>
>>             When I run the code below (Flink 1.4.0 or 1.3.1), only
>>             "a" is printed. If I switch the position of .process() &
>>             .filter() (ie. filter first, then process), both "a" &
>>             "b" are printed, as expected.
>>
>>             I guess it's a bit hard to say what the side output
>>             should include in this case: the stream before filtering
>>             or after it?
>>
>>             What I would suggest is Flink to protect against this
>>             kind of a user error that is hard to debug. Would it be
>>             possible that Flink throws an exception if one tries to
>>             call .getSideOutput() on anything that doesn't actually
>>             provide that side output? Now that I think of it this
>>             seems like a bug to me: why does the call to
>>             getSideOutput succeed if it doesn't provide _any_ input?
>>             I would expect it to get the side output data stream
>>             _before_ applying .filter().
>>
>>             import org.apache.flink.api.common.functions.FilterFunction;
>>             import
>>             org.apache.flink.streaming.api.datastream.DataStreamSource;
>>             import
>>             org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>>             import
>>             org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>             import
>>             org.apache.flink.streaming.api.functions.ProcessFunction;
>>             import org.apache.flink.util.Collector;
>>             import org.apache.flink.util.OutputTag;
>>
>>             public class SideOutputProblem {
>>
>>             public static void main(String[] args) throws Exception {
>>
>>             StreamExecutionEnvironment env =
>>             StreamExecutionEnvironment.getExecutionEnvironment();
>>             DataStreamSource<String> stream = env.fromElements("a", "b");
>>             OutputTag<String> sideOutputTag = new
>>             OutputTag<String>("side-output"){};
>>
>>             SingleOutputStreamOperator<String> processed = stream
>>
>>             .process(new ProcessFunction<String, String>() {
>>               @Override
>>               public void processElement(String s, Context context,
>>             Collector<String> collector) throws Exception {
>>                   if ("a".equals(s)) {
>>             collector.collect(s);
>>                   } else {
>>             context.output(sideOutputTag, s);
>>                   }
>>               }
>>             })
>>
>>             .filter(new FilterFunction<String>() {
>>               @Override
>>               public boolean filter(String s) throws Exception {
>>                   return true;
>>               }
>>             });
>>
>>             processed.getSideOutput(sideOutputTag).printToErr();
>>
>>             processed.print();
>>
>>             env.execute();
>>                 }
>>
>>             }
>>
>>             Cheers,
>>             Juho
>
>
>
>


Re: SideOutput doesn't receive anything if filter is applied after the process function

Posted by Juho Autio <ju...@rovio.com>.
> sideoutput might deserve a seperate class which inherit form
singleoutput. It might prevent lot of confusions

Thanks, but how could that be done? Do you mean that if one calls
.process(), then the stream would change to another class which would only
allow calls like .getMainOutput() or .getSideOutput("name")? Of course
compile time error would be even better than a runtime error, but I don't
see yet how it could be done in practice.

On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin <qi...@gmail.com> wrote:

> Hi Juho,
>
> I think sideoutput might deserve a seperate class which inherit form
> singleoutput. It might prevent lot of confusions. A more generic question
> is whether datastream api can be mulitple ins and mulitple outs natively.
> It's more like scheduling problem when you come from single process system
> to multiple process system, which one should get resource and how much
> sharing same hardware resources, I guess it will open gate to lots of edge
> cases -> strategies-> more edge cases :)
>
> Chen
>
> On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio <ju...@rovio.com> wrote:
>
>> Maybe I could express it in a slightly different way: if adding the
>> .filter() after .process() causes the side output to be somehow totally
>> "lost", then I believe the .getSideOutput() could be aware that there is
>> not such side output to be listened to from upstream, and throw an
>> exception. I mean, this should be possible when building the DAG, it
>> shouldn't require starting the stream to detect? Thanks..
>>
>> On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai <tzulitai@apache.org
>> > wrote:
>>
>>> Hi Juho,
>>>
>>> Now that I think of it this seems like a bug to me: why does the call to
>>> getSideOutput succeed if it doesn't provide _any_ input?
>>>
>>>
>>> With the way side outputs work, I don’t think this is possible (or would
>>> make sense). An operator does not know whether or not it would ever emit
>>> some element with a given tag.
>>> As far as I understand it, calling `getSideOutput` essentially adds a
>>> virtual node to the result stream graph that listens to the specified tag
>>> from the upstream input.
>>>
>>> While I’m not aware whether or not your observation is expected
>>> behavior, from an API perspective, I can see why it is a bit confusing for
>>> you.
>>> Aljoscha would be the expert here, maybe he’ll have more insights. I’ve
>>> looped him in cc’ed.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On 12 January 2018 at 4:05:13 PM, Juho Autio (juho.autio@rovio.com)
>>> wrote:
>>>
>>> When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed.
>>> If I switch the position of .process() & .filter() (ie. filter first, then
>>> process), both "a" & "b" are printed, as expected.
>>>
>>> I guess it's a bit hard to say what the side output should include in
>>> this case: the stream before filtering or after it?
>>>
>>> What I would suggest is Flink to protect against this kind of a user
>>> error that is hard to debug. Would it be possible that Flink throws an
>>> exception if one tries to call .getSideOutput() on anything that doesn't
>>> actually provide that side output? Now that I think of it this seems like a
>>> bug to me: why does the call to getSideOutput succeed if it doesn't provide
>>> _any_ input? I would expect it to get the side output data stream _before_
>>> applying .filter().
>>>
>>> import org.apache.flink.api.common.functions.FilterFunction;
>>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>>> import org.apache.flink.streaming.api.datastream.SingleOutputStream
>>> Operator;
>>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>>> vironment;
>>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>>> import org.apache.flink.util.Collector;
>>> import org.apache.flink.util.OutputTag;
>>>
>>> public class SideOutputProblem {
>>>
>>>     public static void main(String[] args) throws Exception {
>>>
>>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>> ExecutionEnvironment();
>>>         DataStreamSource<String> stream = env.fromElements("a", "b");
>>>         OutputTag<String> sideOutputTag = new
>>> OutputTag<String>("side-output"){};
>>>
>>>         SingleOutputStreamOperator<String> processed = stream
>>>
>>>                 .process(new ProcessFunction<String, String>() {
>>>                     @Override
>>>                     public void processElement(String s, Context
>>> context, Collector<String> collector) throws Exception {
>>>                         if ("a".equals(s)) {
>>>                             collector.collect(s);
>>>                         } else {
>>>                             context.output(sideOutputTag, s);
>>>                         }
>>>                     }
>>>                 })
>>>
>>>                 .filter(new FilterFunction<String>() {
>>>                     @Override
>>>                     public boolean filter(String s) throws Exception {
>>>                         return true;
>>>                     }
>>>                 });
>>>
>>>         processed.getSideOutput(sideOutputTag).printToErr();
>>>
>>>         processed.print();
>>>
>>>         env.execute();
>>>     }
>>>
>>> }
>>>
>>> Cheers,
>>> Juho
>>>
>>>
>>
>

Re: SideOutput doesn't receive anything if filter is applied after the process function

Posted by Chen Qin <qi...@gmail.com>.
Hi Juho,

I think sideoutput might deserve a seperate class which inherit form
singleoutput. It might prevent lot of confusions. A more generic question
is whether datastream api can be mulitple ins and mulitple outs natively.
It's more like scheduling problem when you come from single process system
to multiple process system, which one should get resource and how much
sharing same hardware resources, I guess it will open gate to lots of edge
cases -> strategies-> more edge cases :)

Chen

On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio <ju...@rovio.com> wrote:

> Maybe I could express it in a slightly different way: if adding the
> .filter() after .process() causes the side output to be somehow totally
> "lost", then I believe the .getSideOutput() could be aware that there is
> not such side output to be listened to from upstream, and throw an
> exception. I mean, this should be possible when building the DAG, it
> shouldn't require starting the stream to detect? Thanks..
>
> On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi Juho,
>>
>> Now that I think of it this seems like a bug to me: why does the call to
>> getSideOutput succeed if it doesn't provide _any_ input?
>>
>>
>> With the way side outputs work, I don’t think this is possible (or would
>> make sense). An operator does not know whether or not it would ever emit
>> some element with a given tag.
>> As far as I understand it, calling `getSideOutput` essentially adds a
>> virtual node to the result stream graph that listens to the specified tag
>> from the upstream input.
>>
>> While I’m not aware whether or not your observation is expected behavior,
>> from an API perspective, I can see why it is a bit confusing for you.
>> Aljoscha would be the expert here, maybe he’ll have more insights. I’ve
>> looped him in cc’ed.
>>
>> Cheers,
>> Gordon
>>
>>
>> On 12 January 2018 at 4:05:13 PM, Juho Autio (juho.autio@rovio.com)
>> wrote:
>>
>> When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed. If
>> I switch the position of .process() & .filter() (ie. filter first, then
>> process), both "a" & "b" are printed, as expected.
>>
>> I guess it's a bit hard to say what the side output should include in
>> this case: the stream before filtering or after it?
>>
>> What I would suggest is Flink to protect against this kind of a user
>> error that is hard to debug. Would it be possible that Flink throws an
>> exception if one tries to call .getSideOutput() on anything that doesn't
>> actually provide that side output? Now that I think of it this seems like a
>> bug to me: why does the call to getSideOutput succeed if it doesn't provide
>> _any_ input? I would expect it to get the side output data stream _before_
>> applying .filter().
>>
>> import org.apache.flink.api.common.functions.FilterFunction;
>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>> import org.apache.flink.streaming.api.datastream.SingleOutputStream
>> Operator;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>> vironment;
>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>> import org.apache.flink.util.Collector;
>> import org.apache.flink.util.OutputTag;
>>
>> public class SideOutputProblem {
>>
>>     public static void main(String[] args) throws Exception {
>>
>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>> ExecutionEnvironment();
>>         DataStreamSource<String> stream = env.fromElements("a", "b");
>>         OutputTag<String> sideOutputTag = new
>> OutputTag<String>("side-output"){};
>>
>>         SingleOutputStreamOperator<String> processed = stream
>>
>>                 .process(new ProcessFunction<String, String>() {
>>                     @Override
>>                     public void processElement(String s, Context context,
>> Collector<String> collector) throws Exception {
>>                         if ("a".equals(s)) {
>>                             collector.collect(s);
>>                         } else {
>>                             context.output(sideOutputTag, s);
>>                         }
>>                     }
>>                 })
>>
>>                 .filter(new FilterFunction<String>() {
>>                     @Override
>>                     public boolean filter(String s) throws Exception {
>>                         return true;
>>                     }
>>                 });
>>
>>         processed.getSideOutput(sideOutputTag).printToErr();
>>
>>         processed.print();
>>
>>         env.execute();
>>     }
>>
>> }
>>
>> Cheers,
>> Juho
>>
>>
>

Re: SideOutput doesn't receive anything if filter is applied after the process function

Posted by Juho Autio <ju...@rovio.com>.
Maybe I could express it in a slightly different way: if adding the
.filter() after .process() causes the side output to be somehow totally
"lost", then I believe the .getSideOutput() could be aware that there is
not such side output to be listened to from upstream, and throw an
exception. I mean, this should be possible when building the DAG, it
shouldn't require starting the stream to detect? Thanks..

On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Juho,
>
> Now that I think of it this seems like a bug to me: why does the call to
> getSideOutput succeed if it doesn't provide _any_ input?
>
>
> With the way side outputs work, I don’t think this is possible (or would
> make sense). An operator does not know whether or not it would ever emit
> some element with a given tag.
> As far as I understand it, calling `getSideOutput` essentially adds a
> virtual node to the result stream graph that listens to the specified tag
> from the upstream input.
>
> While I’m not aware whether or not your observation is expected behavior,
> from an API perspective, I can see why it is a bit confusing for you.
> Aljoscha would be the expert here, maybe he’ll have more insights. I’ve
> looped him in cc’ed.
>
> Cheers,
> Gordon
>
>
> On 12 January 2018 at 4:05:13 PM, Juho Autio (juho.autio@rovio.com) wrote:
>
> When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed. If
> I switch the position of .process() & .filter() (ie. filter first, then
> process), both "a" & "b" are printed, as expected.
>
> I guess it's a bit hard to say what the side output should include in this
> case: the stream before filtering or after it?
>
> What I would suggest is Flink to protect against this kind of a user error
> that is hard to debug. Would it be possible that Flink throws an exception
> if one tries to call .getSideOutput() on anything that doesn't actually
> provide that side output? Now that I think of it this seems like a bug to
> me: why does the call to getSideOutput succeed if it doesn't provide _any_
> input? I would expect it to get the side output data stream _before_
> applying .filter().
>
> import org.apache.flink.api.common.functions.FilterFunction;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.datastream.
> SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.
> StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.util.Collector;
> import org.apache.flink.util.OutputTag;
>
> public class SideOutputProblem {
>
>     public static void main(String[] args) throws Exception {
>
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>         DataStreamSource<String> stream = env.fromElements("a", "b");
>         OutputTag<String> sideOutputTag = new OutputTag<String>("side-
> output"){};
>
>         SingleOutputStreamOperator<String> processed = stream
>
>                 .process(new ProcessFunction<String, String>() {
>                     @Override
>                     public void processElement(String s, Context context,
> Collector<String> collector) throws Exception {
>                         if ("a".equals(s)) {
>                             collector.collect(s);
>                         } else {
>                             context.output(sideOutputTag, s);
>                         }
>                     }
>                 })
>
>                 .filter(new FilterFunction<String>() {
>                     @Override
>                     public boolean filter(String s) throws Exception {
>                         return true;
>                     }
>                 });
>
>         processed.getSideOutput(sideOutputTag).printToErr();
>
>         processed.print();
>
>         env.execute();
>     }
>
> }
>
> Cheers,
> Juho
>
>

Re: SideOutput doesn't receive anything if filter is applied after the process function

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Juho,

Now that I think of it this seems like a bug to me: why does the call to getSideOutput succeed if it doesn't provide _any_ input?

With the way side outputs work, I don’t think this is possible (or would make sense). An operator does not know whether or not it would ever emit some element with a given tag.
As far as I understand it, calling `getSideOutput` essentially adds a virtual node to the result stream graph that listens to the specified tag from the upstream input.

While I’m not aware whether or not your observation is expected behavior, from an API perspective, I can see why it is a bit confusing for you.
Aljoscha would be the expert here, maybe he’ll have more insights. I’ve looped him in cc’ed.

Cheers,
Gordon

On 12 January 2018 at 4:05:13 PM, Juho Autio (juho.autio@rovio.com) wrote:

When I run the code below (Flink 1.4.0 or 1.3.1), only "a" is printed. If I switch the position of .process() & .filter() (ie. filter first, then process), both "a" & "b" are printed, as expected.

I guess it's a bit hard to say what the side output should include in this case: the stream before filtering or after it?

What I would suggest is Flink to protect against this kind of a user error that is hard to debug. Would it be possible that Flink throws an exception if one tries to call .getSideOutput() on anything that doesn't actually provide that side output? Now that I think of it this seems like a bug to me: why does the call to getSideOutput succeed if it doesn't provide _any_ input? I would expect it to get the side output data stream _before_ applying .filter().

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class SideOutputProblem {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = env.fromElements("a", "b");
        OutputTag<String> sideOutputTag = new OutputTag<String>("side-output"){};

        SingleOutputStreamOperator<String> processed = stream

                .process(new ProcessFunction<String, String>() {
                    @Override
                    public void processElement(String s, Context context, Collector<String> collector) throws Exception {
                        if ("a".equals(s)) {
                            collector.collect(s);
                        } else {
                            context.output(sideOutputTag, s);
                        }
                    }
                })

                .filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String s) throws Exception {
                        return true;
                    }
                });

        processed.getSideOutput(sideOutputTag).printToErr();

        processed.print();

        env.execute();
    }

}

Cheers,
Juho