You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Aljoscha Krettek <al...@apache.org> on 2017/02/23 14:42:11 UTC

[DISCUSS] Side Outputs and Split/Select

Hi Folks,
Chen and I have been working for a while now on making FLIP-13 (side
outputs) [1] a reality. We think we have a pretty good internal
implementation and also a proposal for an API but now we need to discuss
how we want to go forward with this, especially how we should deal with
split/select which does some of the same things side outputs can do. I'll
first quickly describe what the split/select API looks like, so that we're
all on the same page. Then I'll present the new proposed side output API
and then I'll present new API for getting dropped late data from a windowed
operation, which was the original motivation for adding side outputs.

Split/select consists of two API calls: DataStream.split(OutputSelector)
and SplitStream.select(). You can use it like this:

DataStreamSource<Integer> input = env.fromElements(1, 2, 3);

final String EVEN_SELECTOR = "even";
final String ODD_SELECTOR = "odd";

SplitStream<Integer> split = input.split(
        new OutputSelector<Integer>() {
            @Override
            public Iterable<String> select(Integer value) {
                if (value % 2 == 0) {
                    return Collections.singleton(EVEN_SELECTOR);
                } else {
                    return Collections.singleton(ODD_SELECTOR);
                }
            }
        });

DataStream<Integer> evenStream = split.select(EVEN_SELECTOR);
DataStream<Integer> oddStream = split.select(ODD_SELECTOR);

The stream is split according to an OutputSelector that returns an Iterable
of Strings. Then you can use select() to get a new stream that only
contains elements with the given selector. Notice how the element type for
all the split streams is the same.

The new side output API proposal adds a new type OutputTag<T> and relies on
extending ProcessFunction to allow emitting data to outputs besides the
main output. I think it's best explained with an example as well:

DataStreamSource<Integer> input = env.fromElements(1, 2, 3);

final OutputTag<String> sideOutput1 = new OutputTag<>("side-output-1"){};
final OutputTag<Integer> sideOutput2 = new OutputTag<>("side-output-2"){};

SingleOutputStreamOperator<String> mainOutputStream = input
        .process(new ProcessFunction<Integer, String>() {

            @Override
            public void processElement(
                    Integer value,
                    Context ctx,
                    Collector<String> out) throws Exception {

                ctx.output(sideOutput1, "WE GOT: " + value);
                ctx.output(sideOutput2, value);
                out.collect("MAIN OUTPUT: " + value);
            }

        });

DataStream<String> sideOutputStream1 =
mainOutputStream.getSideOutput(sideOutput1);
DataStream<Integer> sideOutputStream2 =
mainOutputStream.getSideOutput(sideOutput2);

Notice how the OutputTags are anonymous inner classes, similar to TypeHint.
We need this to be able to analyse the type of the side-output streams.
Also notice, how the types of the side-output streams can be independent of
the main-output stream, also notice how everything is correctly type
checked by the Java Compiler.

This change requires making ProcessFunction an abstract base class so that
not every user has to implement the onTimer() method. We would also need to
allow ProcessFunction on a non-keyed stream.

Chen also implemented an API based on FlatMapFunction that looks like the
one proposed in the FLIP. This relies on CollectorWrapper, which can be
used to "pimp" a Collector to also allow emitting to side outputs.

For WindowedStream we have two proposals: make OutputTag visible on the
WindowedStream API or make the result type of WindowedStream operations
more specific to allow a getDroppedDataSideOutput() method. For the first
proposal it would look like this:

final OutputTag<String> lateDataTag = new OutputTag<>("side-output-1"){};

DataStream<T> windowedResult = input
  .keyBy(...)
  .window(...)
  .sideOutputLateData(lateDataTag)
  .apply(...)

DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag);

For the second proposal it would look like this:

WindowedOperator<T> windowedResult = input
  .keyBy(...)
  .window(...)
  .apply(...)

DataStream<IN> lateData = windowedResult.getSideOutput();

Right now, the result of window operations is a
SingleOutputStreamOperator<T>, same as it is for all DataStream operations.
Making the result type more specific, i.e. a WindowedOperator, would allow
us to add extra methods there. This would require wrapping a
SingleOutputStreamOperator and forwarding all the method calls to the
wrapped operator which can be a bit of a hassle for future changes. The
first proposal requires additional boilerplate code.

Sorry for the long mail but I think it's necessary to get everyone on the
same page. The question is now: how should we proceed with the proposed API
and the old split/select API? I propose to deprecate split/select and only
have side outputs, going forward. Of course, I'm a bit biased on this. ;-)
If we decide to do this, we also need to decide on what the side output API
should look like.

Happy discussing! Feedback very welcome. :-)

Best,
Aljoscha

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-13+Side+Outputs+in+Flink

Re: [DISCUSS] Side Outputs and Split/Select

Posted by Jamie Grier <ja...@data-artisans.com>.
+1

On Sat, Mar 4, 2017 at 12:25 AM, Kostas Kloudas <k.kloudas@data-artisans.com
> wrote:

> +1
>
> > On Mar 2, 2017, at 1:08 PM, Fabian Hueske <fh...@gmail.com> wrote:
> >
> > +1
> >
> > 2017-03-02 12:11 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
> >
> >> Ok, so it seems we have to go with the OutputTag variant for windows as
> >> well, for now.
> >>
> >> For Flink 2.0 we can change that. Would everyone be OK with that?
> >>
> >> On Thu, Mar 2, 2017 at 12:05 PM, Robert Metzger <rm...@apache.org>
> >> wrote:
> >>
> >>> Flink enforces binary compatibility for all classes tagged with the
> >> @Public
> >>> annotation.
> >>> Binary compatibility allows users to execute a job against a newer
> Flink
> >>> version without recompiling their job jar.
> >>> Your change alters the return type of some methods (apply()). I think
> >>> there's no way to do that in a binary compatible way.
> >>>
> >>> The only thing we could do is keep the return type as is, but return a
> >>> WindowedOperation instance.
> >>> Users could then manually cast the returned object to access the late
> >>> stream.
> >>>
> >>> Downgrading to "source compatibility" only should fix the issue, but
> then
> >>> users have to recompile their Flink jobs when upgrading the Flink
> >> version.
> >>>
> >>> On Tue, Feb 28, 2017 at 9:37 PM, Fabian Hueske <fh...@gmail.com>
> >> wrote:
> >>>
> >>>> Hi Chen and Aljoscha,
> >>>>
> >>>> thanks for the great proposal and work.
> >>>>
> >>>> I prefer the WindowedOperator.getLateStream() variant without
> explicit
> >>>> tags.
> >>>> I think it is fine to start adding side output to ProcessFunction
> >> (keyed
> >>>> and non-keyed) and window operators and see how it is picked up by
> >> users.
> >>>>
> >>>> Best, Fabian
> >>>>
> >>>>
> >>>> 2017-02-28 15:42 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
> >>>>
> >>>>> Quick update: I created a branch where I make the result type of
> >>>>> WindowedStream operations more specific:
> >>>>> https://github.com/aljoscha/flink/blob/windowed-stream-
> >>>>> result-specific/flink-streaming-java/src/main/java/
> >>>>> org/apache/flink/streaming/api/datastream/WindowedStream.java
> >>>>>
> >>>>> We would need this for the "lateStream()" API without the explicit
> >>>>> OutputTag.
> >>>>>
> >>>>> It seems the backwards compatibility checker doesn't like this and
> >>>>> complains about breaking binary backwards compatibility. +Robert
> >>> Metzger
> >>>>> <rm...@apache.org> Do you have an idea what we could do there?
> >>>>>
> >>>>> On Tue, 28 Feb 2017 at 12:39 Ufuk Celebi <uc...@apache.org> wrote:
> >>>>>
> >>>>>> On Tue, Feb 28, 2017 at 11:38 AM, Aljoscha Krettek <
> >>>> aljoscha@apache.org>
> >>>>>> wrote:
> >>>>>>> I see the ProcessFunction as a bit of the generalised future of
> >>>>> FlatMap,
> >>>>>> so
> >>>>>>> to me it makes sense to only allow side outputs on the
> >>>> ProcessFunction
> >>>>>> but
> >>>>>>> I'm open for anything. If we decide for this I'm happy with an
> >>>>> additional
> >>>>>>> method on Collector.
> >>>>>>
> >>>>>> I think it's best to restrict this to ProcessFunction after all
> >>> (given
> >>>>>> that we allow it for non-keyed streams, etc.). ;-)
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com

Re: [DISCUSS] Side Outputs and Split/Select

Posted by Kostas Kloudas <k....@data-artisans.com>.
+1 

> On Mar 2, 2017, at 1:08 PM, Fabian Hueske <fh...@gmail.com> wrote:
> 
> +1
> 
> 2017-03-02 12:11 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
> 
>> Ok, so it seems we have to go with the OutputTag variant for windows as
>> well, for now.
>> 
>> For Flink 2.0 we can change that. Would everyone be OK with that?
>> 
>> On Thu, Mar 2, 2017 at 12:05 PM, Robert Metzger <rm...@apache.org>
>> wrote:
>> 
>>> Flink enforces binary compatibility for all classes tagged with the
>> @Public
>>> annotation.
>>> Binary compatibility allows users to execute a job against a newer Flink
>>> version without recompiling their job jar.
>>> Your change alters the return type of some methods (apply()). I think
>>> there's no way to do that in a binary compatible way.
>>> 
>>> The only thing we could do is keep the return type as is, but return a
>>> WindowedOperation instance.
>>> Users could then manually cast the returned object to access the late
>>> stream.
>>> 
>>> Downgrading to "source compatibility" only should fix the issue, but then
>>> users have to recompile their Flink jobs when upgrading the Flink
>> version.
>>> 
>>> On Tue, Feb 28, 2017 at 9:37 PM, Fabian Hueske <fh...@gmail.com>
>> wrote:
>>> 
>>>> Hi Chen and Aljoscha,
>>>> 
>>>> thanks for the great proposal and work.
>>>> 
>>>> I prefer the WindowedOperator.getLateStream() variant without explicit
>>>> tags.
>>>> I think it is fine to start adding side output to ProcessFunction
>> (keyed
>>>> and non-keyed) and window operators and see how it is picked up by
>> users.
>>>> 
>>>> Best, Fabian
>>>> 
>>>> 
>>>> 2017-02-28 15:42 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>>>> 
>>>>> Quick update: I created a branch where I make the result type of
>>>>> WindowedStream operations more specific:
>>>>> https://github.com/aljoscha/flink/blob/windowed-stream-
>>>>> result-specific/flink-streaming-java/src/main/java/
>>>>> org/apache/flink/streaming/api/datastream/WindowedStream.java
>>>>> 
>>>>> We would need this for the "lateStream()" API without the explicit
>>>>> OutputTag.
>>>>> 
>>>>> It seems the backwards compatibility checker doesn't like this and
>>>>> complains about breaking binary backwards compatibility. +Robert
>>> Metzger
>>>>> <rm...@apache.org> Do you have an idea what we could do there?
>>>>> 
>>>>> On Tue, 28 Feb 2017 at 12:39 Ufuk Celebi <uc...@apache.org> wrote:
>>>>> 
>>>>>> On Tue, Feb 28, 2017 at 11:38 AM, Aljoscha Krettek <
>>>> aljoscha@apache.org>
>>>>>> wrote:
>>>>>>> I see the ProcessFunction as a bit of the generalised future of
>>>>> FlatMap,
>>>>>> so
>>>>>>> to me it makes sense to only allow side outputs on the
>>>> ProcessFunction
>>>>>> but
>>>>>>> I'm open for anything. If we decide for this I'm happy with an
>>>>> additional
>>>>>>> method on Collector.
>>>>>> 
>>>>>> I think it's best to restrict this to ProcessFunction after all
>>> (given
>>>>>> that we allow it for non-keyed streams, etc.). ;-)
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 


Re: [DISCUSS] Side Outputs and Split/Select

Posted by Fabian Hueske <fh...@gmail.com>.
+1

2017-03-02 12:11 GMT+01:00 Aljoscha Krettek <al...@apache.org>:

> Ok, so it seems we have to go with the OutputTag variant for windows as
> well, for now.
>
> For Flink 2.0 we can change that. Would everyone be OK with that?
>
> On Thu, Mar 2, 2017 at 12:05 PM, Robert Metzger <rm...@apache.org>
> wrote:
>
> > Flink enforces binary compatibility for all classes tagged with the
> @Public
> > annotation.
> > Binary compatibility allows users to execute a job against a newer Flink
> > version without recompiling their job jar.
> > Your change alters the return type of some methods (apply()). I think
> > there's no way to do that in a binary compatible way.
> >
> > The only thing we could do is keep the return type as is, but return a
> > WindowedOperation instance.
> > Users could then manually cast the returned object to access the late
> > stream.
> >
> > Downgrading to "source compatibility" only should fix the issue, but then
> > users have to recompile their Flink jobs when upgrading the Flink
> version.
> >
> > On Tue, Feb 28, 2017 at 9:37 PM, Fabian Hueske <fh...@gmail.com>
> wrote:
> >
> > > Hi Chen and Aljoscha,
> > >
> > > thanks for the great proposal and work.
> > >
> > > I prefer the WindowedOperator.getLateStream() variant without explicit
> > > tags.
> > > I think it is fine to start adding side output to ProcessFunction
> (keyed
> > > and non-keyed) and window operators and see how it is picked up by
> users.
> > >
> > > Best, Fabian
> > >
> > >
> > > 2017-02-28 15:42 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
> > >
> > > > Quick update: I created a branch where I make the result type of
> > > > WindowedStream operations more specific:
> > > > https://github.com/aljoscha/flink/blob/windowed-stream-
> > > > result-specific/flink-streaming-java/src/main/java/
> > > > org/apache/flink/streaming/api/datastream/WindowedStream.java
> > > >
> > > > We would need this for the "lateStream()" API without the explicit
> > > > OutputTag.
> > > >
> > > > It seems the backwards compatibility checker doesn't like this and
> > > > complains about breaking binary backwards compatibility. +Robert
> > Metzger
> > > > <rm...@apache.org> Do you have an idea what we could do there?
> > > >
> > > > On Tue, 28 Feb 2017 at 12:39 Ufuk Celebi <uc...@apache.org> wrote:
> > > >
> > > > > On Tue, Feb 28, 2017 at 11:38 AM, Aljoscha Krettek <
> > > aljoscha@apache.org>
> > > > > wrote:
> > > > > > I see the ProcessFunction as a bit of the generalised future of
> > > > FlatMap,
> > > > > so
> > > > > > to me it makes sense to only allow side outputs on the
> > > ProcessFunction
> > > > > but
> > > > > > I'm open for anything. If we decide for this I'm happy with an
> > > > additional
> > > > > > method on Collector.
> > > > >
> > > > > I think it's best to restrict this to ProcessFunction after all
> > (given
> > > > > that we allow it for non-keyed streams, etc.). ;-)
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Side Outputs and Split/Select

Posted by Aljoscha Krettek <al...@apache.org>.
Ok, so it seems we have to go with the OutputTag variant for windows as
well, for now.

For Flink 2.0 we can change that. Would everyone be OK with that?

On Thu, Mar 2, 2017 at 12:05 PM, Robert Metzger <rm...@apache.org> wrote:

> Flink enforces binary compatibility for all classes tagged with the @Public
> annotation.
> Binary compatibility allows users to execute a job against a newer Flink
> version without recompiling their job jar.
> Your change alters the return type of some methods (apply()). I think
> there's no way to do that in a binary compatible way.
>
> The only thing we could do is keep the return type as is, but return a
> WindowedOperation instance.
> Users could then manually cast the returned object to access the late
> stream.
>
> Downgrading to "source compatibility" only should fix the issue, but then
> users have to recompile their Flink jobs when upgrading the Flink version.
>
> On Tue, Feb 28, 2017 at 9:37 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
> > Hi Chen and Aljoscha,
> >
> > thanks for the great proposal and work.
> >
> > I prefer the WindowedOperator.getLateStream() variant without explicit
> > tags.
> > I think it is fine to start adding side output to ProcessFunction (keyed
> > and non-keyed) and window operators and see how it is picked up by users.
> >
> > Best, Fabian
> >
> >
> > 2017-02-28 15:42 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
> >
> > > Quick update: I created a branch where I make the result type of
> > > WindowedStream operations more specific:
> > > https://github.com/aljoscha/flink/blob/windowed-stream-
> > > result-specific/flink-streaming-java/src/main/java/
> > > org/apache/flink/streaming/api/datastream/WindowedStream.java
> > >
> > > We would need this for the "lateStream()" API without the explicit
> > > OutputTag.
> > >
> > > It seems the backwards compatibility checker doesn't like this and
> > > complains about breaking binary backwards compatibility. +Robert
> Metzger
> > > <rm...@apache.org> Do you have an idea what we could do there?
> > >
> > > On Tue, 28 Feb 2017 at 12:39 Ufuk Celebi <uc...@apache.org> wrote:
> > >
> > > > On Tue, Feb 28, 2017 at 11:38 AM, Aljoscha Krettek <
> > aljoscha@apache.org>
> > > > wrote:
> > > > > I see the ProcessFunction as a bit of the generalised future of
> > > FlatMap,
> > > > so
> > > > > to me it makes sense to only allow side outputs on the
> > ProcessFunction
> > > > but
> > > > > I'm open for anything. If we decide for this I'm happy with an
> > > additional
> > > > > method on Collector.
> > > >
> > > > I think it's best to restrict this to ProcessFunction after all
> (given
> > > > that we allow it for non-keyed streams, etc.). ;-)
> > > >
> > >
> >
>

Re: [DISCUSS] Side Outputs and Split/Select

Posted by Robert Metzger <rm...@apache.org>.
Flink enforces binary compatibility for all classes tagged with the @Public
annotation.
Binary compatibility allows users to execute a job against a newer Flink
version without recompiling their job jar.
Your change alters the return type of some methods (apply()). I think
there's no way to do that in a binary compatible way.

The only thing we could do is keep the return type as is, but return a
WindowedOperation instance.
Users could then manually cast the returned object to access the late
stream.

Downgrading to "source compatibility" only should fix the issue, but then
users have to recompile their Flink jobs when upgrading the Flink version.

On Tue, Feb 28, 2017 at 9:37 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Chen and Aljoscha,
>
> thanks for the great proposal and work.
>
> I prefer the WindowedOperator.getLateStream() variant without explicit
> tags.
> I think it is fine to start adding side output to ProcessFunction (keyed
> and non-keyed) and window operators and see how it is picked up by users.
>
> Best, Fabian
>
>
> 2017-02-28 15:42 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>
> > Quick update: I created a branch where I make the result type of
> > WindowedStream operations more specific:
> > https://github.com/aljoscha/flink/blob/windowed-stream-
> > result-specific/flink-streaming-java/src/main/java/
> > org/apache/flink/streaming/api/datastream/WindowedStream.java
> >
> > We would need this for the "lateStream()" API without the explicit
> > OutputTag.
> >
> > It seems the backwards compatibility checker doesn't like this and
> > complains about breaking binary backwards compatibility. +Robert Metzger
> > <rm...@apache.org> Do you have an idea what we could do there?
> >
> > On Tue, 28 Feb 2017 at 12:39 Ufuk Celebi <uc...@apache.org> wrote:
> >
> > > On Tue, Feb 28, 2017 at 11:38 AM, Aljoscha Krettek <
> aljoscha@apache.org>
> > > wrote:
> > > > I see the ProcessFunction as a bit of the generalised future of
> > FlatMap,
> > > so
> > > > to me it makes sense to only allow side outputs on the
> ProcessFunction
> > > but
> > > > I'm open for anything. If we decide for this I'm happy with an
> > additional
> > > > method on Collector.
> > >
> > > I think it's best to restrict this to ProcessFunction after all (given
> > > that we allow it for non-keyed streams, etc.). ;-)
> > >
> >
>

Re: [DISCUSS] Side Outputs and Split/Select

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Chen and Aljoscha,

thanks for the great proposal and work.

I prefer the WindowedOperator.getLateStream() variant without explicit tags.
I think it is fine to start adding side output to ProcessFunction (keyed
and non-keyed) and window operators and see how it is picked up by users.

Best, Fabian


2017-02-28 15:42 GMT+01:00 Aljoscha Krettek <al...@apache.org>:

> Quick update: I created a branch where I make the result type of
> WindowedStream operations more specific:
> https://github.com/aljoscha/flink/blob/windowed-stream-
> result-specific/flink-streaming-java/src/main/java/
> org/apache/flink/streaming/api/datastream/WindowedStream.java
>
> We would need this for the "lateStream()" API without the explicit
> OutputTag.
>
> It seems the backwards compatibility checker doesn't like this and
> complains about breaking binary backwards compatibility. +Robert Metzger
> <rm...@apache.org> Do you have an idea what we could do there?
>
> On Tue, 28 Feb 2017 at 12:39 Ufuk Celebi <uc...@apache.org> wrote:
>
> > On Tue, Feb 28, 2017 at 11:38 AM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> > > I see the ProcessFunction as a bit of the generalised future of
> FlatMap,
> > so
> > > to me it makes sense to only allow side outputs on the ProcessFunction
> > but
> > > I'm open for anything. If we decide for this I'm happy with an
> additional
> > > method on Collector.
> >
> > I think it's best to restrict this to ProcessFunction after all (given
> > that we allow it for non-keyed streams, etc.). ;-)
> >
>

Re: [DISCUSS] Side Outputs and Split/Select

Posted by Aljoscha Krettek <al...@apache.org>.
Quick update: I created a branch where I make the result type of
WindowedStream operations more specific:
https://github.com/aljoscha/flink/blob/windowed-stream-result-specific/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java

We would need this for the "lateStream()" API without the explicit
OutputTag.

It seems the backwards compatibility checker doesn't like this and
complains about breaking binary backwards compatibility. +Robert Metzger
<rm...@apache.org> Do you have an idea what we could do there?

On Tue, 28 Feb 2017 at 12:39 Ufuk Celebi <uc...@apache.org> wrote:

> On Tue, Feb 28, 2017 at 11:38 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
> > I see the ProcessFunction as a bit of the generalised future of FlatMap,
> so
> > to me it makes sense to only allow side outputs on the ProcessFunction
> but
> > I'm open for anything. If we decide for this I'm happy with an additional
> > method on Collector.
>
> I think it's best to restrict this to ProcessFunction after all (given
> that we allow it for non-keyed streams, etc.). ;-)
>

Re: [DISCUSS] Side Outputs and Split/Select

Posted by Ufuk Celebi <uc...@apache.org>.
On Tue, Feb 28, 2017 at 11:38 AM, Aljoscha Krettek <al...@apache.org> wrote:
> I see the ProcessFunction as a bit of the generalised future of FlatMap, so
> to me it makes sense to only allow side outputs on the ProcessFunction but
> I'm open for anything. If we decide for this I'm happy with an additional
> method on Collector.

I think it's best to restrict this to ProcessFunction after all (given
that we allow it for non-keyed streams, etc.). ;-)

Re: [DISCUSS] Side Outputs and Split/Select

Posted by Aljoscha Krettek <al...@apache.org>.
About 1: We can definitely go with Jamie's proposal for the late data side
output, for me this is just a name and anything that has "late" in it is
perfect!

Regarding 2: I agree, and I though about implementing split/select on top
of side outputs and it should be easily doable. I think side outputs are
strictly more powerful.

Regarding 3: I think we should not expose side outputs on the
RuntimeContext. We only allow emitting to the main output via a Collector,
which is only available on the flatMap() call (or WindowFunction.apply(),
which is FlatMap for windows. at least in the Stream API). If we have a
method for emitting to a side output on the RuntimeContext we would need
(somewhat tedious) logic to determine whether the method was called from a
processing method (map(), filter(), ...) or from one of the lifecycle
methods (open(), close(), snapshot() ...). We can add an additional side
output method to Collector. From this it follows, that we could only extend
FlatMap with side output capabilities because other user functions don't
have a collector. I'm not against this, but Collector is a bit of a tricky
interface because it is @Public and some people might implement it. This is
a rough list of internal implementations of Collector:

WriterCollector in CombiningUnilateralSortMerger
(org.apache.flink.runtime.operators.sort)
CountingOutputCollector in BinaryOperatorTestBase
(org.apache.flink.runtime.operators.testutils)
RightCollector in PatternFlatSelectTimeoutWrapper in PatternStream
(org.apache.flink.cep)
ChainedDriver (org.apache.flink.runtime.operators.chaining)
ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining)
SynchronousChainedCombineDriver
(org.apache.flink.runtime.operators.chaining)
NoOpChainedDriver (org.apache.flink.runtime.operators)
ChainedReduceCombineDriver (org.apache.flink.runtime.operators.chaining)
ChainedAllReduceDriver (org.apache.flink.runtime.operators.chaining)
ChainedMapDriver (org.apache.flink.runtime.operators.chaining)
GroupCombineChainedDriver (org.apache.flink.runtime.operators.chaining)
ChainedTerminationCriterionDriver
(org.apache.flink.runtime.operators.chaining)
WorksetUpdateOutputCollector (org.apache.flink.runtime.iterative.io)
TimestampedCollector (org.apache.flink.streaming.api.operators)
OutputCollector (org.apache.flink.runtime.operators.shipping)
DiscardingOutputCollector (org.apache.flink.runtime.operators.testutils)
LeftCollector in PatternFlatSelectTimeoutWrapper in PatternStream
(org.apache.flink.cep)
CountingOutputCollector in UnaryOperatorTestBase
(org.apache.flink.runtime.operators.testutils)
ListOutputCollector in DriverTestBase
(org.apache.flink.runtime.operators.testutils)
TupleWrappingCollector (org.apache.flink.api.java.operators.translation)
SolutionSetObjectsUpdateOutputCollector (
org.apache.flink.runtime.iterative.io)
Tuple3WrappingCollector (org.apache.flink.api.java.operators.translation)
CountingCollector (org.apache.flink.runtime.operators.util.metrics)
ListOutputCollector in UnaryOperatorTestBase
(org.apache.flink.runtime.operators.testutils)
ListOutputCollector in BinaryOperatorTestBase
(org.apache.flink.runtime.operators.testutils)
CountingOutputCollector in DriverTestBase
(org.apache.flink.runtime.operators.testutils)
ListCollector (org.apache.flink.api.common.functions.util)
SolutionSetFastUpdateOutputCollector (org.apache.flink.runtime.iterative.io)
SolutionSetUpdateOutputCollector (org.apache.flink.runtime.iterative.io)
CopyingListCollector (org.apache.flink.api.common.functions.util)
GatheringCollector (org.apache.flink.runtime.operators.drivers)
Anonymous in initialize() in IterationTailTask
(org.apache.flink.runtime.iterative.task)
TableFunctionCollector (org.apache.flink.table.runtime)
TimeWindowPropertyCollector (org.apache.flink.table.runtime.aggregate)
<anonymous> in flatSelect() in PatternStream (org.apache.flink.cep.scala)
<anonymous> in flatSelect() in PatternStream (org.apache.flink.cep.scala)

I see the ProcessFunction as a bit of the generalised future of FlatMap, so
to me it makes sense to only allow side outputs on the ProcessFunction but
I'm open for anything. If we decide for this I'm happy with an additional
method on Collector.

On Tue, 28 Feb 2017 at 10:32 Ufuk Celebi <uc...@apache.org> wrote:

> 1. I like the variant without the explicit OutputTag for the
> WindowOperator:
>
> WindowedOperator<T> windowedResult = input
>   .keyBy(...)
>   .window(...)
>   .apply(...)
>
> DataStream<IN> lateData = windowedResult.getLateDataSideOutput();
>
> I like Jamie's proposal getLateStream() a little better though. On the
> other hand I see that it makes sense to make it explicit that a side
> output is consumed.
>
> 2. I would keep the split/select API and deprecate it. Ideally,
> implemented on top of side outputs.
>
> 3. What about Gyula's question to expose side output for regular
> RichFunctions as well?
>
> I think it makes sense to not "force" users to the ProcessFunction in
> order to use side outputs. If on the other hand we think that the main
> use case will be the late data stream from windows then it's probably
> fine. I think we have two options for RichFunctions, either the
> runtime context or the collector, both of which are shared with the
> DataSet API. I would be OK with throwing an
> UnsupportedOperationException if a batch API user tries to access it.
>
>
>
> On Mon, Feb 27, 2017 at 8:56 PM, Jamie Grier <ja...@data-artisans.com>
> wrote:
> > Aljoscha,
> >
> > Ahh, that is much better.  As long as it's explicitly referring to late
> > data I think it's fine.  I also like the second variant where a user
> > doesn't have to explicitly create the OutputTag.
> >
> >
> >
> > On Mon, Feb 27, 2017 at 8:45 AM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> >> @Jamie I must have mistyped my last API proposal. This piece of code:
> >> WindowedOperator<T> windowedResult = input
> >>   .keyBy(...)
> >>   .window(...)
> >>   .apply(...)
> >>
> >> DataStream<IN> lateData = windowedResult.getSideOutput();
> >>
> >> should actually have been:
> >>
> >> WindowedOperator<T> windowedResult = input
> >>   .keyBy(...)
> >>   .window(...)
> >>   .apply(...)
> >>
> >> DataStream<IN> lateData = windowedResult.getLateDataSideOutput();
> >>
> >> So apart from the naming it's pretty much the same as your suggestion,
> >> right? The reason why I preferred the explicit OutputTag is that we
> >> otherwise have to create another layer of OutputTags that are internal
> to
> >> the system so that users cannot accidentally also send data to the same
> >> side output. It just means writing more code for use and introducing the
> >> more concrete return type for the WindowedStream operations. But that's
> >> fine if y'all prefer that variant. :-)
> >>
> >> On Sat, 25 Feb 2017 at 04:19 Chen Qin <qi...@gmail.com> wrote:
> >>
> >> > Hi Jamie,
> >> >
> >> > I think it does make consuming late arriving events more explicit! At
> >> cost
> >> > of
> >> > fix a predefined OutputTag<IN> which user have no control nor
> definition
> >> > an extra UDF which essentially filter out all mainOutputs and only let
> >> > sideOutput pass (like filterFunction)
> >> >
> >> > Thanks,
> >> > Chen
> >> >
> >> > > On Feb 24, 2017, at 1:17 PM, Jamie Grier <ja...@data-artisans.com>
> >> > wrote:
> >> > >
> >> > > I prefer the ProcessFunction and side outputs solution over split()
> and
> >> > > select() which I've never liked primarily due to the lack of type
> >> safety
> >> > > and it also doesn't really seem to fit with the rest of Flink's API.
> >> > >
> >> > > On the late data question I strongly prefer the late data concept
> being
> >> > > explicit in the API.  Could we not also do something like:
> >> > >
> >> > > WindowedStream<> windowedStream = input
> >> > >  .keyBy(...)
> >> > >  .window(...);
> >> > >
> >> > > DataStream<> mainOutput = windowedStream
> >> > >  .apply(...);
> >> > >
> >> > > DataStream<> lateOutput = windowStream
> >> > >  .lateStream();
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gy...@apache.org>
> wrote:
> >> > >
> >> > >> Hi,
> >> > >>
> >> > >> Thanks for the nice proposal, I like the idea of side outputs, and
> it
> >> > would
> >> > >> make a lot of topologies much simpler.
> >> > >>
> >> > >> Regarding the API I think we should come up with a way of making
> side
> >> > >> otuputs accessible from all sort of operators in a similar way. For
> >> > >> instance through the RichFunction interface with a special
> collector
> >> > that
> >> > >> we invalidate when the user should not be collecting to it. (just a
> >> > quick
> >> > >> idea)
> >> > >>
> >> > >> I personally wouldn't deprecate the "universal" Split/Select API
> that
> >> > can
> >> > >> be used on any  DataStream in favor of functionality that is only
> >> > >> accessible trhough the process function/ or a few select
> operators. I
> >> > think
> >> > >> the Split/Select pattern is also very nice and I use it in many
> >> > different
> >> > >> contexts to get efficient multiway filtering (after map/co
> operators
> >> for
> >> > >> examples).
> >> > >>
> >> > >> Regards,
> >> > >> Gyula
> >> > >>
> >> > >> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2017.
> febr.
> >> > 23.,
> >> > >> Cs, 15:42):
> >> > >>
> >> > >>> Hi Folks,
> >> > >>> Chen and I have been working for a while now on making FLIP-13
> (side
> >> > >>> outputs) [1] a reality. We think we have a pretty good internal
> >> > >>> implementation and also a proposal for an API but now we need to
> >> > discuss
> >> > >>> how we want to go forward with this, especially how we should deal
> >> with
> >> > >>> split/select which does some of the same things side outputs can
> do.
> >> > I'll
> >> > >>> first quickly describe what the split/select API looks like, so
> that
> >> > >> we're
> >> > >>> all on the same page. Then I'll present the new proposed side
> output
> >> > API
> >> > >>> and then I'll present new API for getting dropped late data from a
> >> > >> windowed
> >> > >>> operation, which was the original motivation for adding side
> outputs.
> >> > >>>
> >> > >>> Split/select consists of two API calls:
> >> > DataStream.split(OutputSelector)
> >> > >>> and SplitStream.select(). You can use it like this:
> >> > >>>
> >> > >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
> >> > >>>
> >> > >>> final String EVEN_SELECTOR = "even";
> >> > >>> final String ODD_SELECTOR = "odd";
> >> > >>>
> >> > >>> SplitStream<Integer> split = input.split(
> >> > >>>        new OutputSelector<Integer>() {
> >> > >>>            @Override
> >> > >>>            public Iterable<String> select(Integer value) {
> >> > >>>                if (value % 2 == 0) {
> >> > >>>                    return Collections.singleton(EVEN_SELECTOR);
> >> > >>>                } else {
> >> > >>>                    return Collections.singleton(ODD_SELECTOR);
> >> > >>>                }
> >> > >>>            }
> >> > >>>        });
> >> > >>>
> >> > >>> DataStream<Integer> evenStream = split.select(EVEN_SELECTOR);
> >> > >>> DataStream<Integer> oddStream = split.select(ODD_SELECTOR);
> >> > >>>
> >> > >>> The stream is split according to an OutputSelector that returns an
> >> > >> Iterable
> >> > >>> of Strings. Then you can use select() to get a new stream that
> only
> >> > >>> contains elements with the given selector. Notice how the element
> >> type
> >> > >> for
> >> > >>> all the split streams is the same.
> >> > >>>
> >> > >>> The new side output API proposal adds a new type OutputTag<T> and
> >> > relies
> >> > >> on
> >> > >>> extending ProcessFunction to allow emitting data to outputs
> besides
> >> the
> >> > >>> main output. I think it's best explained with an example as well:
> >> > >>>
> >> > >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
> >> > >>>
> >> > >>> final OutputTag<String> sideOutput1 = new
> >> > OutputTag<>("side-output-1"){}
> >> > >> ;
> >> > >>> final OutputTag<Integer> sideOutput2 = new
> >> > OutputTag<>("side-output-2"){}
> >> > >> ;
> >> > >>>
> >> > >>> SingleOutputStreamOperator<String> mainOutputStream = input
> >> > >>>        .process(new ProcessFunction<Integer, String>() {
> >> > >>>
> >> > >>>            @Override
> >> > >>>            public void processElement(
> >> > >>>                    Integer value,
> >> > >>>                    Context ctx,
> >> > >>>                    Collector<String> out) throws Exception {
> >> > >>>
> >> > >>>                ctx.output(sideOutput1, "WE GOT: " + value);
> >> > >>>                ctx.output(sideOutput2, value);
> >> > >>>                out.collect("MAIN OUTPUT: " + value);
> >> > >>>            }
> >> > >>>
> >> > >>>        });
> >> > >>>
> >> > >>> DataStream<String> sideOutputStream1 =
> >> > >>> mainOutputStream.getSideOutput(sideOutput1);
> >> > >>> DataStream<Integer> sideOutputStream2 =
> >> > >>> mainOutputStream.getSideOutput(sideOutput2);
> >> > >>>
> >> > >>> Notice how the OutputTags are anonymous inner classes, similar to
> >> > >> TypeHint.
> >> > >>> We need this to be able to analyse the type of the side-output
> >> streams.
> >> > >>> Also notice, how the types of the side-output streams can be
> >> > independent
> >> > >> of
> >> > >>> the main-output stream, also notice how everything is correctly
> type
> >> > >>> checked by the Java Compiler.
> >> > >>>
> >> > >>> This change requires making ProcessFunction an abstract base
> class so
> >> > >> that
> >> > >>> not every user has to implement the onTimer() method. We would
> also
> >> > need
> >> > >> to
> >> > >>> allow ProcessFunction on a non-keyed stream.
> >> > >>>
> >> > >>> Chen also implemented an API based on FlatMapFunction that looks
> like
> >> > the
> >> > >>> one proposed in the FLIP. This relies on CollectorWrapper, which
> can
> >> be
> >> > >>> used to "pimp" a Collector to also allow emitting to side outputs.
> >> > >>>
> >> > >>> For WindowedStream we have two proposals: make OutputTag visible
> on
> >> the
> >> > >>> WindowedStream API or make the result type of WindowedStream
> >> operations
> >> > >>> more specific to allow a getDroppedDataSideOutput() method. For
> the
> >> > first
> >> > >>> proposal it would look like this:
> >> > >>>
> >> > >>> final OutputTag<String> lateDataTag = new
> >> > OutputTag<>("side-output-1"){}
> >> > >> ;
> >> > >>>
> >> > >>> DataStream<T> windowedResult = input
> >> > >>>  .keyBy(...)
> >> > >>>  .window(...)
> >> > >>>  .sideOutputLateData(lateDataTag)
> >> > >>>  .apply(...)
> >> > >>>
> >> > >>> DataStream<IN> lateData =
> windowedResult.getSideOutput(lateDataTag);
> >> > >>>
> >> > >>> For the second proposal it would look like this:
> >> > >>>
> >> > >>> WindowedOperator<T> windowedResult = input
> >> > >>>  .keyBy(...)
> >> > >>>  .window(...)
> >> > >>>  .apply(...)
> >> > >>>
> >> > >>> DataStream<IN> lateData = windowedResult.getSideOutput();
> >> > >>>
> >> > >>> Right now, the result of window operations is a
> >> > >>> SingleOutputStreamOperator<T>, same as it is for all DataStream
> >> > >> operations.
> >> > >>> Making the result type more specific, i.e. a WindowedOperator,
> would
> >> > >> allow
> >> > >>> us to add extra methods there. This would require wrapping a
> >> > >>> SingleOutputStreamOperator and forwarding all the method calls to
> the
> >> > >>> wrapped operator which can be a bit of a hassle for future
> changes.
> >> The
> >> > >>> first proposal requires additional boilerplate code.
> >> > >>>
> >> > >>> Sorry for the long mail but I think it's necessary to get
> everyone on
> >> > the
> >> > >>> same page. The question is now: how should we proceed with the
> >> proposed
> >> > >> API
> >> > >>> and the old split/select API? I propose to deprecate split/select
> and
> >> > >> only
> >> > >>> have side outputs, going forward. Of course, I'm a bit biased on
> >> this.
> >> > >> ;-)
> >> > >>> If we decide to do this, we also need to decide on what the side
> >> output
> >> > >> API
> >> > >>> should look like.
> >> > >>>
> >> > >>> Happy discussing! Feedback very welcome. :-)
> >> > >>>
> >> > >>> Best,
> >> > >>> Aljoscha
> >> > >>>
> >> > >>> [1]
> >> > >>>
> >> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> >> > >> 13+Side+Outputs+in+Flink
> >> > >>>
> >> > >>
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > >
> >> > > Jamie Grier
> >> > > data Artisans, Director of Applications Engineering
> >> > > @jamiegrier <https://twitter.com/jamiegrier>
> >> > > jamie@data-artisans.com
> >> >
> >> >
> >>
> >
> >
> >
> > --
> >
> > Jamie Grier
> > data Artisans, Director of Applications Engineering
> > @jamiegrier <https://twitter.com/jamiegrier>
> > jamie@data-artisans.com
>

Re: [DISCUSS] Side Outputs and Split/Select

Posted by Ufuk Celebi <uc...@apache.org>.
1. I like the variant without the explicit OutputTag for the WindowOperator:

WindowedOperator<T> windowedResult = input
  .keyBy(...)
  .window(...)
  .apply(...)

DataStream<IN> lateData = windowedResult.getLateDataSideOutput();

I like Jamie's proposal getLateStream() a little better though. On the
other hand I see that it makes sense to make it explicit that a side
output is consumed.

2. I would keep the split/select API and deprecate it. Ideally,
implemented on top of side outputs.

3. What about Gyula's question to expose side output for regular
RichFunctions as well?

I think it makes sense to not "force" users to the ProcessFunction in
order to use side outputs. If on the other hand we think that the main
use case will be the late data stream from windows then it's probably
fine. I think we have two options for RichFunctions, either the
runtime context or the collector, both of which are shared with the
DataSet API. I would be OK with throwing an
UnsupportedOperationException if a batch API user tries to access it.



On Mon, Feb 27, 2017 at 8:56 PM, Jamie Grier <ja...@data-artisans.com> wrote:
> Aljoscha,
>
> Ahh, that is much better.  As long as it's explicitly referring to late
> data I think it's fine.  I also like the second variant where a user
> doesn't have to explicitly create the OutputTag.
>
>
>
> On Mon, Feb 27, 2017 at 8:45 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> @Jamie I must have mistyped my last API proposal. This piece of code:
>> WindowedOperator<T> windowedResult = input
>>   .keyBy(...)
>>   .window(...)
>>   .apply(...)
>>
>> DataStream<IN> lateData = windowedResult.getSideOutput();
>>
>> should actually have been:
>>
>> WindowedOperator<T> windowedResult = input
>>   .keyBy(...)
>>   .window(...)
>>   .apply(...)
>>
>> DataStream<IN> lateData = windowedResult.getLateDataSideOutput();
>>
>> So apart from the naming it's pretty much the same as your suggestion,
>> right? The reason why I preferred the explicit OutputTag is that we
>> otherwise have to create another layer of OutputTags that are internal to
>> the system so that users cannot accidentally also send data to the same
>> side output. It just means writing more code for use and introducing the
>> more concrete return type for the WindowedStream operations. But that's
>> fine if y'all prefer that variant. :-)
>>
>> On Sat, 25 Feb 2017 at 04:19 Chen Qin <qi...@gmail.com> wrote:
>>
>> > Hi Jamie,
>> >
>> > I think it does make consuming late arriving events more explicit! At
>> cost
>> > of
>> > fix a predefined OutputTag<IN> which user have no control nor definition
>> > an extra UDF which essentially filter out all mainOutputs and only let
>> > sideOutput pass (like filterFunction)
>> >
>> > Thanks,
>> > Chen
>> >
>> > > On Feb 24, 2017, at 1:17 PM, Jamie Grier <ja...@data-artisans.com>
>> > wrote:
>> > >
>> > > I prefer the ProcessFunction and side outputs solution over split() and
>> > > select() which I've never liked primarily due to the lack of type
>> safety
>> > > and it also doesn't really seem to fit with the rest of Flink's API.
>> > >
>> > > On the late data question I strongly prefer the late data concept being
>> > > explicit in the API.  Could we not also do something like:
>> > >
>> > > WindowedStream<> windowedStream = input
>> > >  .keyBy(...)
>> > >  .window(...);
>> > >
>> > > DataStream<> mainOutput = windowedStream
>> > >  .apply(...);
>> > >
>> > > DataStream<> lateOutput = windowStream
>> > >  .lateStream();
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > > On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gy...@apache.org> wrote:
>> > >
>> > >> Hi,
>> > >>
>> > >> Thanks for the nice proposal, I like the idea of side outputs, and it
>> > would
>> > >> make a lot of topologies much simpler.
>> > >>
>> > >> Regarding the API I think we should come up with a way of making side
>> > >> otuputs accessible from all sort of operators in a similar way. For
>> > >> instance through the RichFunction interface with a special collector
>> > that
>> > >> we invalidate when the user should not be collecting to it. (just a
>> > quick
>> > >> idea)
>> > >>
>> > >> I personally wouldn't deprecate the "universal" Split/Select API that
>> > can
>> > >> be used on any  DataStream in favor of functionality that is only
>> > >> accessible trhough the process function/ or a few select operators. I
>> > think
>> > >> the Split/Select pattern is also very nice and I use it in many
>> > different
>> > >> contexts to get efficient multiway filtering (after map/co operators
>> for
>> > >> examples).
>> > >>
>> > >> Regards,
>> > >> Gyula
>> > >>
>> > >> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2017. febr.
>> > 23.,
>> > >> Cs, 15:42):
>> > >>
>> > >>> Hi Folks,
>> > >>> Chen and I have been working for a while now on making FLIP-13 (side
>> > >>> outputs) [1] a reality. We think we have a pretty good internal
>> > >>> implementation and also a proposal for an API but now we need to
>> > discuss
>> > >>> how we want to go forward with this, especially how we should deal
>> with
>> > >>> split/select which does some of the same things side outputs can do.
>> > I'll
>> > >>> first quickly describe what the split/select API looks like, so that
>> > >> we're
>> > >>> all on the same page. Then I'll present the new proposed side output
>> > API
>> > >>> and then I'll present new API for getting dropped late data from a
>> > >> windowed
>> > >>> operation, which was the original motivation for adding side outputs.
>> > >>>
>> > >>> Split/select consists of two API calls:
>> > DataStream.split(OutputSelector)
>> > >>> and SplitStream.select(). You can use it like this:
>> > >>>
>> > >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
>> > >>>
>> > >>> final String EVEN_SELECTOR = "even";
>> > >>> final String ODD_SELECTOR = "odd";
>> > >>>
>> > >>> SplitStream<Integer> split = input.split(
>> > >>>        new OutputSelector<Integer>() {
>> > >>>            @Override
>> > >>>            public Iterable<String> select(Integer value) {
>> > >>>                if (value % 2 == 0) {
>> > >>>                    return Collections.singleton(EVEN_SELECTOR);
>> > >>>                } else {
>> > >>>                    return Collections.singleton(ODD_SELECTOR);
>> > >>>                }
>> > >>>            }
>> > >>>        });
>> > >>>
>> > >>> DataStream<Integer> evenStream = split.select(EVEN_SELECTOR);
>> > >>> DataStream<Integer> oddStream = split.select(ODD_SELECTOR);
>> > >>>
>> > >>> The stream is split according to an OutputSelector that returns an
>> > >> Iterable
>> > >>> of Strings. Then you can use select() to get a new stream that only
>> > >>> contains elements with the given selector. Notice how the element
>> type
>> > >> for
>> > >>> all the split streams is the same.
>> > >>>
>> > >>> The new side output API proposal adds a new type OutputTag<T> and
>> > relies
>> > >> on
>> > >>> extending ProcessFunction to allow emitting data to outputs besides
>> the
>> > >>> main output. I think it's best explained with an example as well:
>> > >>>
>> > >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
>> > >>>
>> > >>> final OutputTag<String> sideOutput1 = new
>> > OutputTag<>("side-output-1"){}
>> > >> ;
>> > >>> final OutputTag<Integer> sideOutput2 = new
>> > OutputTag<>("side-output-2"){}
>> > >> ;
>> > >>>
>> > >>> SingleOutputStreamOperator<String> mainOutputStream = input
>> > >>>        .process(new ProcessFunction<Integer, String>() {
>> > >>>
>> > >>>            @Override
>> > >>>            public void processElement(
>> > >>>                    Integer value,
>> > >>>                    Context ctx,
>> > >>>                    Collector<String> out) throws Exception {
>> > >>>
>> > >>>                ctx.output(sideOutput1, "WE GOT: " + value);
>> > >>>                ctx.output(sideOutput2, value);
>> > >>>                out.collect("MAIN OUTPUT: " + value);
>> > >>>            }
>> > >>>
>> > >>>        });
>> > >>>
>> > >>> DataStream<String> sideOutputStream1 =
>> > >>> mainOutputStream.getSideOutput(sideOutput1);
>> > >>> DataStream<Integer> sideOutputStream2 =
>> > >>> mainOutputStream.getSideOutput(sideOutput2);
>> > >>>
>> > >>> Notice how the OutputTags are anonymous inner classes, similar to
>> > >> TypeHint.
>> > >>> We need this to be able to analyse the type of the side-output
>> streams.
>> > >>> Also notice, how the types of the side-output streams can be
>> > independent
>> > >> of
>> > >>> the main-output stream, also notice how everything is correctly type
>> > >>> checked by the Java Compiler.
>> > >>>
>> > >>> This change requires making ProcessFunction an abstract base class so
>> > >> that
>> > >>> not every user has to implement the onTimer() method. We would also
>> > need
>> > >> to
>> > >>> allow ProcessFunction on a non-keyed stream.
>> > >>>
>> > >>> Chen also implemented an API based on FlatMapFunction that looks like
>> > the
>> > >>> one proposed in the FLIP. This relies on CollectorWrapper, which can
>> be
>> > >>> used to "pimp" a Collector to also allow emitting to side outputs.
>> > >>>
>> > >>> For WindowedStream we have two proposals: make OutputTag visible on
>> the
>> > >>> WindowedStream API or make the result type of WindowedStream
>> operations
>> > >>> more specific to allow a getDroppedDataSideOutput() method. For the
>> > first
>> > >>> proposal it would look like this:
>> > >>>
>> > >>> final OutputTag<String> lateDataTag = new
>> > OutputTag<>("side-output-1"){}
>> > >> ;
>> > >>>
>> > >>> DataStream<T> windowedResult = input
>> > >>>  .keyBy(...)
>> > >>>  .window(...)
>> > >>>  .sideOutputLateData(lateDataTag)
>> > >>>  .apply(...)
>> > >>>
>> > >>> DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag);
>> > >>>
>> > >>> For the second proposal it would look like this:
>> > >>>
>> > >>> WindowedOperator<T> windowedResult = input
>> > >>>  .keyBy(...)
>> > >>>  .window(...)
>> > >>>  .apply(...)
>> > >>>
>> > >>> DataStream<IN> lateData = windowedResult.getSideOutput();
>> > >>>
>> > >>> Right now, the result of window operations is a
>> > >>> SingleOutputStreamOperator<T>, same as it is for all DataStream
>> > >> operations.
>> > >>> Making the result type more specific, i.e. a WindowedOperator, would
>> > >> allow
>> > >>> us to add extra methods there. This would require wrapping a
>> > >>> SingleOutputStreamOperator and forwarding all the method calls to the
>> > >>> wrapped operator which can be a bit of a hassle for future changes.
>> The
>> > >>> first proposal requires additional boilerplate code.
>> > >>>
>> > >>> Sorry for the long mail but I think it's necessary to get everyone on
>> > the
>> > >>> same page. The question is now: how should we proceed with the
>> proposed
>> > >> API
>> > >>> and the old split/select API? I propose to deprecate split/select and
>> > >> only
>> > >>> have side outputs, going forward. Of course, I'm a bit biased on
>> this.
>> > >> ;-)
>> > >>> If we decide to do this, we also need to decide on what the side
>> output
>> > >> API
>> > >>> should look like.
>> > >>>
>> > >>> Happy discussing! Feedback very welcome. :-)
>> > >>>
>> > >>> Best,
>> > >>> Aljoscha
>> > >>>
>> > >>> [1]
>> > >>>
>> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
>> > >> 13+Side+Outputs+in+Flink
>> > >>>
>> > >>
>> > >
>> > >
>> > >
>> > > --
>> > >
>> > > Jamie Grier
>> > > data Artisans, Director of Applications Engineering
>> > > @jamiegrier <https://twitter.com/jamiegrier>
>> > > jamie@data-artisans.com
>> >
>> >
>>
>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> jamie@data-artisans.com

Re: [DISCUSS] Side Outputs and Split/Select

Posted by Jamie Grier <ja...@data-artisans.com>.
Aljoscha,

Ahh, that is much better.  As long as it's explicitly referring to late
data I think it's fine.  I also like the second variant where a user
doesn't have to explicitly create the OutputTag.



On Mon, Feb 27, 2017 at 8:45 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> @Jamie I must have mistyped my last API proposal. This piece of code:
> WindowedOperator<T> windowedResult = input
>   .keyBy(...)
>   .window(...)
>   .apply(...)
>
> DataStream<IN> lateData = windowedResult.getSideOutput();
>
> should actually have been:
>
> WindowedOperator<T> windowedResult = input
>   .keyBy(...)
>   .window(...)
>   .apply(...)
>
> DataStream<IN> lateData = windowedResult.getLateDataSideOutput();
>
> So apart from the naming it's pretty much the same as your suggestion,
> right? The reason why I preferred the explicit OutputTag is that we
> otherwise have to create another layer of OutputTags that are internal to
> the system so that users cannot accidentally also send data to the same
> side output. It just means writing more code for use and introducing the
> more concrete return type for the WindowedStream operations. But that's
> fine if y'all prefer that variant. :-)
>
> On Sat, 25 Feb 2017 at 04:19 Chen Qin <qi...@gmail.com> wrote:
>
> > Hi Jamie,
> >
> > I think it does make consuming late arriving events more explicit! At
> cost
> > of
> > fix a predefined OutputTag<IN> which user have no control nor definition
> > an extra UDF which essentially filter out all mainOutputs and only let
> > sideOutput pass (like filterFunction)
> >
> > Thanks,
> > Chen
> >
> > > On Feb 24, 2017, at 1:17 PM, Jamie Grier <ja...@data-artisans.com>
> > wrote:
> > >
> > > I prefer the ProcessFunction and side outputs solution over split() and
> > > select() which I've never liked primarily due to the lack of type
> safety
> > > and it also doesn't really seem to fit with the rest of Flink's API.
> > >
> > > On the late data question I strongly prefer the late data concept being
> > > explicit in the API.  Could we not also do something like:
> > >
> > > WindowedStream<> windowedStream = input
> > >  .keyBy(...)
> > >  .window(...);
> > >
> > > DataStream<> mainOutput = windowedStream
> > >  .apply(...);
> > >
> > > DataStream<> lateOutput = windowStream
> > >  .lateStream();
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gy...@apache.org> wrote:
> > >
> > >> Hi,
> > >>
> > >> Thanks for the nice proposal, I like the idea of side outputs, and it
> > would
> > >> make a lot of topologies much simpler.
> > >>
> > >> Regarding the API I think we should come up with a way of making side
> > >> otuputs accessible from all sort of operators in a similar way. For
> > >> instance through the RichFunction interface with a special collector
> > that
> > >> we invalidate when the user should not be collecting to it. (just a
> > quick
> > >> idea)
> > >>
> > >> I personally wouldn't deprecate the "universal" Split/Select API that
> > can
> > >> be used on any  DataStream in favor of functionality that is only
> > >> accessible trhough the process function/ or a few select operators. I
> > think
> > >> the Split/Select pattern is also very nice and I use it in many
> > different
> > >> contexts to get efficient multiway filtering (after map/co operators
> for
> > >> examples).
> > >>
> > >> Regards,
> > >> Gyula
> > >>
> > >> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2017. febr.
> > 23.,
> > >> Cs, 15:42):
> > >>
> > >>> Hi Folks,
> > >>> Chen and I have been working for a while now on making FLIP-13 (side
> > >>> outputs) [1] a reality. We think we have a pretty good internal
> > >>> implementation and also a proposal for an API but now we need to
> > discuss
> > >>> how we want to go forward with this, especially how we should deal
> with
> > >>> split/select which does some of the same things side outputs can do.
> > I'll
> > >>> first quickly describe what the split/select API looks like, so that
> > >> we're
> > >>> all on the same page. Then I'll present the new proposed side output
> > API
> > >>> and then I'll present new API for getting dropped late data from a
> > >> windowed
> > >>> operation, which was the original motivation for adding side outputs.
> > >>>
> > >>> Split/select consists of two API calls:
> > DataStream.split(OutputSelector)
> > >>> and SplitStream.select(). You can use it like this:
> > >>>
> > >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
> > >>>
> > >>> final String EVEN_SELECTOR = "even";
> > >>> final String ODD_SELECTOR = "odd";
> > >>>
> > >>> SplitStream<Integer> split = input.split(
> > >>>        new OutputSelector<Integer>() {
> > >>>            @Override
> > >>>            public Iterable<String> select(Integer value) {
> > >>>                if (value % 2 == 0) {
> > >>>                    return Collections.singleton(EVEN_SELECTOR);
> > >>>                } else {
> > >>>                    return Collections.singleton(ODD_SELECTOR);
> > >>>                }
> > >>>            }
> > >>>        });
> > >>>
> > >>> DataStream<Integer> evenStream = split.select(EVEN_SELECTOR);
> > >>> DataStream<Integer> oddStream = split.select(ODD_SELECTOR);
> > >>>
> > >>> The stream is split according to an OutputSelector that returns an
> > >> Iterable
> > >>> of Strings. Then you can use select() to get a new stream that only
> > >>> contains elements with the given selector. Notice how the element
> type
> > >> for
> > >>> all the split streams is the same.
> > >>>
> > >>> The new side output API proposal adds a new type OutputTag<T> and
> > relies
> > >> on
> > >>> extending ProcessFunction to allow emitting data to outputs besides
> the
> > >>> main output. I think it's best explained with an example as well:
> > >>>
> > >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
> > >>>
> > >>> final OutputTag<String> sideOutput1 = new
> > OutputTag<>("side-output-1"){}
> > >> ;
> > >>> final OutputTag<Integer> sideOutput2 = new
> > OutputTag<>("side-output-2"){}
> > >> ;
> > >>>
> > >>> SingleOutputStreamOperator<String> mainOutputStream = input
> > >>>        .process(new ProcessFunction<Integer, String>() {
> > >>>
> > >>>            @Override
> > >>>            public void processElement(
> > >>>                    Integer value,
> > >>>                    Context ctx,
> > >>>                    Collector<String> out) throws Exception {
> > >>>
> > >>>                ctx.output(sideOutput1, "WE GOT: " + value);
> > >>>                ctx.output(sideOutput2, value);
> > >>>                out.collect("MAIN OUTPUT: " + value);
> > >>>            }
> > >>>
> > >>>        });
> > >>>
> > >>> DataStream<String> sideOutputStream1 =
> > >>> mainOutputStream.getSideOutput(sideOutput1);
> > >>> DataStream<Integer> sideOutputStream2 =
> > >>> mainOutputStream.getSideOutput(sideOutput2);
> > >>>
> > >>> Notice how the OutputTags are anonymous inner classes, similar to
> > >> TypeHint.
> > >>> We need this to be able to analyse the type of the side-output
> streams.
> > >>> Also notice, how the types of the side-output streams can be
> > independent
> > >> of
> > >>> the main-output stream, also notice how everything is correctly type
> > >>> checked by the Java Compiler.
> > >>>
> > >>> This change requires making ProcessFunction an abstract base class so
> > >> that
> > >>> not every user has to implement the onTimer() method. We would also
> > need
> > >> to
> > >>> allow ProcessFunction on a non-keyed stream.
> > >>>
> > >>> Chen also implemented an API based on FlatMapFunction that looks like
> > the
> > >>> one proposed in the FLIP. This relies on CollectorWrapper, which can
> be
> > >>> used to "pimp" a Collector to also allow emitting to side outputs.
> > >>>
> > >>> For WindowedStream we have two proposals: make OutputTag visible on
> the
> > >>> WindowedStream API or make the result type of WindowedStream
> operations
> > >>> more specific to allow a getDroppedDataSideOutput() method. For the
> > first
> > >>> proposal it would look like this:
> > >>>
> > >>> final OutputTag<String> lateDataTag = new
> > OutputTag<>("side-output-1"){}
> > >> ;
> > >>>
> > >>> DataStream<T> windowedResult = input
> > >>>  .keyBy(...)
> > >>>  .window(...)
> > >>>  .sideOutputLateData(lateDataTag)
> > >>>  .apply(...)
> > >>>
> > >>> DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag);
> > >>>
> > >>> For the second proposal it would look like this:
> > >>>
> > >>> WindowedOperator<T> windowedResult = input
> > >>>  .keyBy(...)
> > >>>  .window(...)
> > >>>  .apply(...)
> > >>>
> > >>> DataStream<IN> lateData = windowedResult.getSideOutput();
> > >>>
> > >>> Right now, the result of window operations is a
> > >>> SingleOutputStreamOperator<T>, same as it is for all DataStream
> > >> operations.
> > >>> Making the result type more specific, i.e. a WindowedOperator, would
> > >> allow
> > >>> us to add extra methods there. This would require wrapping a
> > >>> SingleOutputStreamOperator and forwarding all the method calls to the
> > >>> wrapped operator which can be a bit of a hassle for future changes.
> The
> > >>> first proposal requires additional boilerplate code.
> > >>>
> > >>> Sorry for the long mail but I think it's necessary to get everyone on
> > the
> > >>> same page. The question is now: how should we proceed with the
> proposed
> > >> API
> > >>> and the old split/select API? I propose to deprecate split/select and
> > >> only
> > >>> have side outputs, going forward. Of course, I'm a bit biased on
> this.
> > >> ;-)
> > >>> If we decide to do this, we also need to decide on what the side
> output
> > >> API
> > >>> should look like.
> > >>>
> > >>> Happy discussing! Feedback very welcome. :-)
> > >>>
> > >>> Best,
> > >>> Aljoscha
> > >>>
> > >>> [1]
> > >>>
> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > >> 13+Side+Outputs+in+Flink
> > >>>
> > >>
> > >
> > >
> > >
> > > --
> > >
> > > Jamie Grier
> > > data Artisans, Director of Applications Engineering
> > > @jamiegrier <https://twitter.com/jamiegrier>
> > > jamie@data-artisans.com
> >
> >
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com

Re: [DISCUSS] Side Outputs and Split/Select

Posted by Stephan Ewen <se...@apache.org>.
I like this proposal!

  - The side output API seems more powerful, because it allows different
output types.

  - It would be nice to eventually have only one construct, because
multiple variants for the same thing tend to confuse users.
  - One can probably implement split/select with side outputs as a special
case, where instead of "select(string)" one has "select(tag)".
  - As a migration step, we can keep a deprecated "select(string)" and make
it use a tag that is just the result type of the stream

  - For the window operator, I like the second variant better, which does
not require users to explicitly declare a tag



On Mon, Feb 27, 2017 at 5:47 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> I'm curious to know what people think about the OutputTag API for the
> general side-output implementation?
>
> One thing that might easily go overlooked is that I changed ProcessFunction
> from an interface to an abstract class. So that I could provide a default
> onTime() method. This also would require allowing ProcessFunction on a
> non-keyed stream, as I mentioned in my first mail (I hope).
>
> On Mon, 27 Feb 2017 at 17:45 Aljoscha Krettek <al...@apache.org> wrote:
>
> > @Jamie I must have mistyped my last API proposal. This piece of code:
> > WindowedOperator<T> windowedResult = input
> >   .keyBy(...)
> >   .window(...)
> >   .apply(...)
> >
> > DataStream<IN> lateData = windowedResult.getSideOutput();
> >
> > should actually have been:
> >
> > WindowedOperator<T> windowedResult = input
> >   .keyBy(...)
> >   .window(...)
> >   .apply(...)
> >
> > DataStream<IN> lateData = windowedResult.getLateDataSideOutput();
> >
> > So apart from the naming it's pretty much the same as your suggestion,
> > right? The reason why I preferred the explicit OutputTag is that we
> > otherwise have to create another layer of OutputTags that are internal to
> > the system so that users cannot accidentally also send data to the same
> > side output. It just means writing more code for use and introducing the
> > more concrete return type for the WindowedStream operations. But that's
> > fine if y'all prefer that variant. :-)
> >
> > On Sat, 25 Feb 2017 at 04:19 Chen Qin <qi...@gmail.com> wrote:
> >
> > Hi Jamie,
> >
> > I think it does make consuming late arriving events more explicit! At
> cost
> > of
> > fix a predefined OutputTag<IN> which user have no control nor definition
> > an extra UDF which essentially filter out all mainOutputs and only let
> > sideOutput pass (like filterFunction)
> >
> > Thanks,
> > Chen
> >
> > > On Feb 24, 2017, at 1:17 PM, Jamie Grier <ja...@data-artisans.com>
> > wrote:
> > >
> > > I prefer the ProcessFunction and side outputs solution over split() and
> > > select() which I've never liked primarily due to the lack of type
> safety
> > > and it also doesn't really seem to fit with the rest of Flink's API.
> > >
> > > On the late data question I strongly prefer the late data concept being
> > > explicit in the API.  Could we not also do something like:
> > >
> > > WindowedStream<> windowedStream = input
> > >  .keyBy(...)
> > >  .window(...);
> > >
> > > DataStream<> mainOutput = windowedStream
> > >  .apply(...);
> > >
> > > DataStream<> lateOutput = windowStream
> > >  .lateStream();
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gy...@apache.org> wrote:
> > >
> > >> Hi,
> > >>
> > >> Thanks for the nice proposal, I like the idea of side outputs, and it
> > would
> > >> make a lot of topologies much simpler.
> > >>
> > >> Regarding the API I think we should come up with a way of making side
> > >> otuputs accessible from all sort of operators in a similar way. For
> > >> instance through the RichFunction interface with a special collector
> > that
> > >> we invalidate when the user should not be collecting to it. (just a
> > quick
> > >> idea)
> > >>
> > >> I personally wouldn't deprecate the "universal" Split/Select API that
> > can
> > >> be used on any  DataStream in favor of functionality that is only
> > >> accessible trhough the process function/ or a few select operators. I
> > think
> > >> the Split/Select pattern is also very nice and I use it in many
> > different
> > >> contexts to get efficient multiway filtering (after map/co operators
> for
> > >> examples).
> > >>
> > >> Regards,
> > >> Gyula
> > >>
> > >> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2017. febr.
> > 23.,
> > >> Cs, 15:42):
> > >>
> > >>> Hi Folks,
> > >>> Chen and I have been working for a while now on making FLIP-13 (side
> > >>> outputs) [1] a reality. We think we have a pretty good internal
> > >>> implementation and also a proposal for an API but now we need to
> > discuss
> > >>> how we want to go forward with this, especially how we should deal
> with
> > >>> split/select which does some of the same things side outputs can do.
> > I'll
> > >>> first quickly describe what the split/select API looks like, so that
> > >> we're
> > >>> all on the same page. Then I'll present the new proposed side output
> > API
> > >>> and then I'll present new API for getting dropped late data from a
> > >> windowed
> > >>> operation, which was the original motivation for adding side outputs.
> > >>>
> > >>> Split/select consists of two API calls:
> > DataStream.split(OutputSelector)
> > >>> and SplitStream.select(). You can use it like this:
> > >>>
> > >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
> > >>>
> > >>> final String EVEN_SELECTOR = "even";
> > >>> final String ODD_SELECTOR = "odd";
> > >>>
> > >>> SplitStream<Integer> split = input.split(
> > >>>        new OutputSelector<Integer>() {
> > >>>            @Override
> > >>>            public Iterable<String> select(Integer value) {
> > >>>                if (value % 2 == 0) {
> > >>>                    return Collections.singleton(EVEN_SELECTOR);
> > >>>                } else {
> > >>>                    return Collections.singleton(ODD_SELECTOR);
> > >>>                }
> > >>>            }
> > >>>        });
> > >>>
> > >>> DataStream<Integer> evenStream = split.select(EVEN_SELECTOR);
> > >>> DataStream<Integer> oddStream = split.select(ODD_SELECTOR);
> > >>>
> > >>> The stream is split according to an OutputSelector that returns an
> > >> Iterable
> > >>> of Strings. Then you can use select() to get a new stream that only
> > >>> contains elements with the given selector. Notice how the element
> type
> > >> for
> > >>> all the split streams is the same.
> > >>>
> > >>> The new side output API proposal adds a new type OutputTag<T> and
> > relies
> > >> on
> > >>> extending ProcessFunction to allow emitting data to outputs besides
> the
> > >>> main output. I think it's best explained with an example as well:
> > >>>
> > >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
> > >>>
> > >>> final OutputTag<String> sideOutput1 = new
> > OutputTag<>("side-output-1"){}
> > >> ;
> > >>> final OutputTag<Integer> sideOutput2 = new
> > OutputTag<>("side-output-2"){}
> > >> ;
> > >>>
> > >>> SingleOutputStreamOperator<String> mainOutputStream = input
> > >>>        .process(new ProcessFunction<Integer, String>() {
> > >>>
> > >>>            @Override
> > >>>            public void processElement(
> > >>>                    Integer value,
> > >>>                    Context ctx,
> > >>>                    Collector<String> out) throws Exception {
> > >>>
> > >>>                ctx.output(sideOutput1, "WE GOT: " + value);
> > >>>                ctx.output(sideOutput2, value);
> > >>>                out.collect("MAIN OUTPUT: " + value);
> > >>>            }
> > >>>
> > >>>        });
> > >>>
> > >>> DataStream<String> sideOutputStream1 =
> > >>> mainOutputStream.getSideOutput(sideOutput1);
> > >>> DataStream<Integer> sideOutputStream2 =
> > >>> mainOutputStream.getSideOutput(sideOutput2);
> > >>>
> > >>> Notice how the OutputTags are anonymous inner classes, similar to
> > >> TypeHint.
> > >>> We need this to be able to analyse the type of the side-output
> streams.
> > >>> Also notice, how the types of the side-output streams can be
> > independent
> > >> of
> > >>> the main-output stream, also notice how everything is correctly type
> > >>> checked by the Java Compiler.
> > >>>
> > >>> This change requires making ProcessFunction an abstract base class so
> > >> that
> > >>> not every user has to implement the onTimer() method. We would also
> > need
> > >> to
> > >>> allow ProcessFunction on a non-keyed stream.
> > >>>
> > >>> Chen also implemented an API based on FlatMapFunction that looks like
> > the
> > >>> one proposed in the FLIP. This relies on CollectorWrapper, which can
> be
> > >>> used to "pimp" a Collector to also allow emitting to side outputs.
> > >>>
> > >>> For WindowedStream we have two proposals: make OutputTag visible on
> the
> > >>> WindowedStream API or make the result type of WindowedStream
> operations
> > >>> more specific to allow a getDroppedDataSideOutput() method. For the
> > first
> > >>> proposal it would look like this:
> > >>>
> > >>> final OutputTag<String> lateDataTag = new
> > OutputTag<>("side-output-1"){}
> > >> ;
> > >>>
> > >>> DataStream<T> windowedResult = input
> > >>>  .keyBy(...)
> > >>>  .window(...)
> > >>>  .sideOutputLateData(lateDataTag)
> > >>>  .apply(...)
> > >>>
> > >>> DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag);
> > >>>
> > >>> For the second proposal it would look like this:
> > >>>
> > >>> WindowedOperator<T> windowedResult = input
> > >>>  .keyBy(...)
> > >>>  .window(...)
> > >>>  .apply(...)
> > >>>
> > >>> DataStream<IN> lateData = windowedResult.getSideOutput();
> > >>>
> > >>> Right now, the result of window operations is a
> > >>> SingleOutputStreamOperator<T>, same as it is for all DataStream
> > >> operations.
> > >>> Making the result type more specific, i.e. a WindowedOperator, would
> > >> allow
> > >>> us to add extra methods there. This would require wrapping a
> > >>> SingleOutputStreamOperator and forwarding all the method calls to the
> > >>> wrapped operator which can be a bit of a hassle for future changes.
> The
> > >>> first proposal requires additional boilerplate code.
> > >>>
> > >>> Sorry for the long mail but I think it's necessary to get everyone on
> > the
> > >>> same page. The question is now: how should we proceed with the
> proposed
> > >> API
> > >>> and the old split/select API? I propose to deprecate split/select and
> > >> only
> > >>> have side outputs, going forward. Of course, I'm a bit biased on
> this.
> > >> ;-)
> > >>> If we decide to do this, we also need to decide on what the side
> output
> > >> API
> > >>> should look like.
> > >>>
> > >>> Happy discussing! Feedback very welcome. :-)
> > >>>
> > >>> Best,
> > >>> Aljoscha
> > >>>
> > >>> [1]
> > >>>
> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > >> 13+Side+Outputs+in+Flink
> > >>>
> > >>
> > >
> > >
> > >
> > > --
> > >
> > > Jamie Grier
> > > data Artisans, Director of Applications Engineering
> > > @jamiegrier <https://twitter.com/jamiegrier>
> > > jamie@data-artisans.com
> >
> >
>

Re: [DISCUSS] Side Outputs and Split/Select

Posted by Aljoscha Krettek <al...@apache.org>.
I'm curious to know what people think about the OutputTag API for the
general side-output implementation?

One thing that might easily go overlooked is that I changed ProcessFunction
from an interface to an abstract class. So that I could provide a default
onTime() method. This also would require allowing ProcessFunction on a
non-keyed stream, as I mentioned in my first mail (I hope).

On Mon, 27 Feb 2017 at 17:45 Aljoscha Krettek <al...@apache.org> wrote:

> @Jamie I must have mistyped my last API proposal. This piece of code:
> WindowedOperator<T> windowedResult = input
>   .keyBy(...)
>   .window(...)
>   .apply(...)
>
> DataStream<IN> lateData = windowedResult.getSideOutput();
>
> should actually have been:
>
> WindowedOperator<T> windowedResult = input
>   .keyBy(...)
>   .window(...)
>   .apply(...)
>
> DataStream<IN> lateData = windowedResult.getLateDataSideOutput();
>
> So apart from the naming it's pretty much the same as your suggestion,
> right? The reason why I preferred the explicit OutputTag is that we
> otherwise have to create another layer of OutputTags that are internal to
> the system so that users cannot accidentally also send data to the same
> side output. It just means writing more code for use and introducing the
> more concrete return type for the WindowedStream operations. But that's
> fine if y'all prefer that variant. :-)
>
> On Sat, 25 Feb 2017 at 04:19 Chen Qin <qi...@gmail.com> wrote:
>
> Hi Jamie,
>
> I think it does make consuming late arriving events more explicit! At cost
> of
> fix a predefined OutputTag<IN> which user have no control nor definition
> an extra UDF which essentially filter out all mainOutputs and only let
> sideOutput pass (like filterFunction)
>
> Thanks,
> Chen
>
> > On Feb 24, 2017, at 1:17 PM, Jamie Grier <ja...@data-artisans.com>
> wrote:
> >
> > I prefer the ProcessFunction and side outputs solution over split() and
> > select() which I've never liked primarily due to the lack of type safety
> > and it also doesn't really seem to fit with the rest of Flink's API.
> >
> > On the late data question I strongly prefer the late data concept being
> > explicit in the API.  Could we not also do something like:
> >
> > WindowedStream<> windowedStream = input
> >  .keyBy(...)
> >  .window(...);
> >
> > DataStream<> mainOutput = windowedStream
> >  .apply(...);
> >
> > DataStream<> lateOutput = windowStream
> >  .lateStream();
> >
> >
> >
> >
> >
> >
> >
> > On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gy...@apache.org> wrote:
> >
> >> Hi,
> >>
> >> Thanks for the nice proposal, I like the idea of side outputs, and it
> would
> >> make a lot of topologies much simpler.
> >>
> >> Regarding the API I think we should come up with a way of making side
> >> otuputs accessible from all sort of operators in a similar way. For
> >> instance through the RichFunction interface with a special collector
> that
> >> we invalidate when the user should not be collecting to it. (just a
> quick
> >> idea)
> >>
> >> I personally wouldn't deprecate the "universal" Split/Select API that
> can
> >> be used on any  DataStream in favor of functionality that is only
> >> accessible trhough the process function/ or a few select operators. I
> think
> >> the Split/Select pattern is also very nice and I use it in many
> different
> >> contexts to get efficient multiway filtering (after map/co operators for
> >> examples).
> >>
> >> Regards,
> >> Gyula
> >>
> >> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2017. febr.
> 23.,
> >> Cs, 15:42):
> >>
> >>> Hi Folks,
> >>> Chen and I have been working for a while now on making FLIP-13 (side
> >>> outputs) [1] a reality. We think we have a pretty good internal
> >>> implementation and also a proposal for an API but now we need to
> discuss
> >>> how we want to go forward with this, especially how we should deal with
> >>> split/select which does some of the same things side outputs can do.
> I'll
> >>> first quickly describe what the split/select API looks like, so that
> >> we're
> >>> all on the same page. Then I'll present the new proposed side output
> API
> >>> and then I'll present new API for getting dropped late data from a
> >> windowed
> >>> operation, which was the original motivation for adding side outputs.
> >>>
> >>> Split/select consists of two API calls:
> DataStream.split(OutputSelector)
> >>> and SplitStream.select(). You can use it like this:
> >>>
> >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
> >>>
> >>> final String EVEN_SELECTOR = "even";
> >>> final String ODD_SELECTOR = "odd";
> >>>
> >>> SplitStream<Integer> split = input.split(
> >>>        new OutputSelector<Integer>() {
> >>>            @Override
> >>>            public Iterable<String> select(Integer value) {
> >>>                if (value % 2 == 0) {
> >>>                    return Collections.singleton(EVEN_SELECTOR);
> >>>                } else {
> >>>                    return Collections.singleton(ODD_SELECTOR);
> >>>                }
> >>>            }
> >>>        });
> >>>
> >>> DataStream<Integer> evenStream = split.select(EVEN_SELECTOR);
> >>> DataStream<Integer> oddStream = split.select(ODD_SELECTOR);
> >>>
> >>> The stream is split according to an OutputSelector that returns an
> >> Iterable
> >>> of Strings. Then you can use select() to get a new stream that only
> >>> contains elements with the given selector. Notice how the element type
> >> for
> >>> all the split streams is the same.
> >>>
> >>> The new side output API proposal adds a new type OutputTag<T> and
> relies
> >> on
> >>> extending ProcessFunction to allow emitting data to outputs besides the
> >>> main output. I think it's best explained with an example as well:
> >>>
> >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
> >>>
> >>> final OutputTag<String> sideOutput1 = new
> OutputTag<>("side-output-1"){}
> >> ;
> >>> final OutputTag<Integer> sideOutput2 = new
> OutputTag<>("side-output-2"){}
> >> ;
> >>>
> >>> SingleOutputStreamOperator<String> mainOutputStream = input
> >>>        .process(new ProcessFunction<Integer, String>() {
> >>>
> >>>            @Override
> >>>            public void processElement(
> >>>                    Integer value,
> >>>                    Context ctx,
> >>>                    Collector<String> out) throws Exception {
> >>>
> >>>                ctx.output(sideOutput1, "WE GOT: " + value);
> >>>                ctx.output(sideOutput2, value);
> >>>                out.collect("MAIN OUTPUT: " + value);
> >>>            }
> >>>
> >>>        });
> >>>
> >>> DataStream<String> sideOutputStream1 =
> >>> mainOutputStream.getSideOutput(sideOutput1);
> >>> DataStream<Integer> sideOutputStream2 =
> >>> mainOutputStream.getSideOutput(sideOutput2);
> >>>
> >>> Notice how the OutputTags are anonymous inner classes, similar to
> >> TypeHint.
> >>> We need this to be able to analyse the type of the side-output streams.
> >>> Also notice, how the types of the side-output streams can be
> independent
> >> of
> >>> the main-output stream, also notice how everything is correctly type
> >>> checked by the Java Compiler.
> >>>
> >>> This change requires making ProcessFunction an abstract base class so
> >> that
> >>> not every user has to implement the onTimer() method. We would also
> need
> >> to
> >>> allow ProcessFunction on a non-keyed stream.
> >>>
> >>> Chen also implemented an API based on FlatMapFunction that looks like
> the
> >>> one proposed in the FLIP. This relies on CollectorWrapper, which can be
> >>> used to "pimp" a Collector to also allow emitting to side outputs.
> >>>
> >>> For WindowedStream we have two proposals: make OutputTag visible on the
> >>> WindowedStream API or make the result type of WindowedStream operations
> >>> more specific to allow a getDroppedDataSideOutput() method. For the
> first
> >>> proposal it would look like this:
> >>>
> >>> final OutputTag<String> lateDataTag = new
> OutputTag<>("side-output-1"){}
> >> ;
> >>>
> >>> DataStream<T> windowedResult = input
> >>>  .keyBy(...)
> >>>  .window(...)
> >>>  .sideOutputLateData(lateDataTag)
> >>>  .apply(...)
> >>>
> >>> DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag);
> >>>
> >>> For the second proposal it would look like this:
> >>>
> >>> WindowedOperator<T> windowedResult = input
> >>>  .keyBy(...)
> >>>  .window(...)
> >>>  .apply(...)
> >>>
> >>> DataStream<IN> lateData = windowedResult.getSideOutput();
> >>>
> >>> Right now, the result of window operations is a
> >>> SingleOutputStreamOperator<T>, same as it is for all DataStream
> >> operations.
> >>> Making the result type more specific, i.e. a WindowedOperator, would
> >> allow
> >>> us to add extra methods there. This would require wrapping a
> >>> SingleOutputStreamOperator and forwarding all the method calls to the
> >>> wrapped operator which can be a bit of a hassle for future changes. The
> >>> first proposal requires additional boilerplate code.
> >>>
> >>> Sorry for the long mail but I think it's necessary to get everyone on
> the
> >>> same page. The question is now: how should we proceed with the proposed
> >> API
> >>> and the old split/select API? I propose to deprecate split/select and
> >> only
> >>> have side outputs, going forward. Of course, I'm a bit biased on this.
> >> ;-)
> >>> If we decide to do this, we also need to decide on what the side output
> >> API
> >>> should look like.
> >>>
> >>> Happy discussing! Feedback very welcome. :-)
> >>>
> >>> Best,
> >>> Aljoscha
> >>>
> >>> [1]
> >>>
> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> >> 13+Side+Outputs+in+Flink
> >>>
> >>
> >
> >
> >
> > --
> >
> > Jamie Grier
> > data Artisans, Director of Applications Engineering
> > @jamiegrier <https://twitter.com/jamiegrier>
> > jamie@data-artisans.com
>
>

Re: [DISCUSS] Side Outputs and Split/Select

Posted by Aljoscha Krettek <al...@apache.org>.
@Jamie I must have mistyped my last API proposal. This piece of code:
WindowedOperator<T> windowedResult = input
  .keyBy(...)
  .window(...)
  .apply(...)

DataStream<IN> lateData = windowedResult.getSideOutput();

should actually have been:

WindowedOperator<T> windowedResult = input
  .keyBy(...)
  .window(...)
  .apply(...)

DataStream<IN> lateData = windowedResult.getLateDataSideOutput();

So apart from the naming it's pretty much the same as your suggestion,
right? The reason why I preferred the explicit OutputTag is that we
otherwise have to create another layer of OutputTags that are internal to
the system so that users cannot accidentally also send data to the same
side output. It just means writing more code for use and introducing the
more concrete return type for the WindowedStream operations. But that's
fine if y'all prefer that variant. :-)

On Sat, 25 Feb 2017 at 04:19 Chen Qin <qi...@gmail.com> wrote:

> Hi Jamie,
>
> I think it does make consuming late arriving events more explicit! At cost
> of
> fix a predefined OutputTag<IN> which user have no control nor definition
> an extra UDF which essentially filter out all mainOutputs and only let
> sideOutput pass (like filterFunction)
>
> Thanks,
> Chen
>
> > On Feb 24, 2017, at 1:17 PM, Jamie Grier <ja...@data-artisans.com>
> wrote:
> >
> > I prefer the ProcessFunction and side outputs solution over split() and
> > select() which I've never liked primarily due to the lack of type safety
> > and it also doesn't really seem to fit with the rest of Flink's API.
> >
> > On the late data question I strongly prefer the late data concept being
> > explicit in the API.  Could we not also do something like:
> >
> > WindowedStream<> windowedStream = input
> >  .keyBy(...)
> >  .window(...);
> >
> > DataStream<> mainOutput = windowedStream
> >  .apply(...);
> >
> > DataStream<> lateOutput = windowStream
> >  .lateStream();
> >
> >
> >
> >
> >
> >
> >
> > On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gy...@apache.org> wrote:
> >
> >> Hi,
> >>
> >> Thanks for the nice proposal, I like the idea of side outputs, and it
> would
> >> make a lot of topologies much simpler.
> >>
> >> Regarding the API I think we should come up with a way of making side
> >> otuputs accessible from all sort of operators in a similar way. For
> >> instance through the RichFunction interface with a special collector
> that
> >> we invalidate when the user should not be collecting to it. (just a
> quick
> >> idea)
> >>
> >> I personally wouldn't deprecate the "universal" Split/Select API that
> can
> >> be used on any  DataStream in favor of functionality that is only
> >> accessible trhough the process function/ or a few select operators. I
> think
> >> the Split/Select pattern is also very nice and I use it in many
> different
> >> contexts to get efficient multiway filtering (after map/co operators for
> >> examples).
> >>
> >> Regards,
> >> Gyula
> >>
> >> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2017. febr.
> 23.,
> >> Cs, 15:42):
> >>
> >>> Hi Folks,
> >>> Chen and I have been working for a while now on making FLIP-13 (side
> >>> outputs) [1] a reality. We think we have a pretty good internal
> >>> implementation and also a proposal for an API but now we need to
> discuss
> >>> how we want to go forward with this, especially how we should deal with
> >>> split/select which does some of the same things side outputs can do.
> I'll
> >>> first quickly describe what the split/select API looks like, so that
> >> we're
> >>> all on the same page. Then I'll present the new proposed side output
> API
> >>> and then I'll present new API for getting dropped late data from a
> >> windowed
> >>> operation, which was the original motivation for adding side outputs.
> >>>
> >>> Split/select consists of two API calls:
> DataStream.split(OutputSelector)
> >>> and SplitStream.select(). You can use it like this:
> >>>
> >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
> >>>
> >>> final String EVEN_SELECTOR = "even";
> >>> final String ODD_SELECTOR = "odd";
> >>>
> >>> SplitStream<Integer> split = input.split(
> >>>        new OutputSelector<Integer>() {
> >>>            @Override
> >>>            public Iterable<String> select(Integer value) {
> >>>                if (value % 2 == 0) {
> >>>                    return Collections.singleton(EVEN_SELECTOR);
> >>>                } else {
> >>>                    return Collections.singleton(ODD_SELECTOR);
> >>>                }
> >>>            }
> >>>        });
> >>>
> >>> DataStream<Integer> evenStream = split.select(EVEN_SELECTOR);
> >>> DataStream<Integer> oddStream = split.select(ODD_SELECTOR);
> >>>
> >>> The stream is split according to an OutputSelector that returns an
> >> Iterable
> >>> of Strings. Then you can use select() to get a new stream that only
> >>> contains elements with the given selector. Notice how the element type
> >> for
> >>> all the split streams is the same.
> >>>
> >>> The new side output API proposal adds a new type OutputTag<T> and
> relies
> >> on
> >>> extending ProcessFunction to allow emitting data to outputs besides the
> >>> main output. I think it's best explained with an example as well:
> >>>
> >>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
> >>>
> >>> final OutputTag<String> sideOutput1 = new
> OutputTag<>("side-output-1"){}
> >> ;
> >>> final OutputTag<Integer> sideOutput2 = new
> OutputTag<>("side-output-2"){}
> >> ;
> >>>
> >>> SingleOutputStreamOperator<String> mainOutputStream = input
> >>>        .process(new ProcessFunction<Integer, String>() {
> >>>
> >>>            @Override
> >>>            public void processElement(
> >>>                    Integer value,
> >>>                    Context ctx,
> >>>                    Collector<String> out) throws Exception {
> >>>
> >>>                ctx.output(sideOutput1, "WE GOT: " + value);
> >>>                ctx.output(sideOutput2, value);
> >>>                out.collect("MAIN OUTPUT: " + value);
> >>>            }
> >>>
> >>>        });
> >>>
> >>> DataStream<String> sideOutputStream1 =
> >>> mainOutputStream.getSideOutput(sideOutput1);
> >>> DataStream<Integer> sideOutputStream2 =
> >>> mainOutputStream.getSideOutput(sideOutput2);
> >>>
> >>> Notice how the OutputTags are anonymous inner classes, similar to
> >> TypeHint.
> >>> We need this to be able to analyse the type of the side-output streams.
> >>> Also notice, how the types of the side-output streams can be
> independent
> >> of
> >>> the main-output stream, also notice how everything is correctly type
> >>> checked by the Java Compiler.
> >>>
> >>> This change requires making ProcessFunction an abstract base class so
> >> that
> >>> not every user has to implement the onTimer() method. We would also
> need
> >> to
> >>> allow ProcessFunction on a non-keyed stream.
> >>>
> >>> Chen also implemented an API based on FlatMapFunction that looks like
> the
> >>> one proposed in the FLIP. This relies on CollectorWrapper, which can be
> >>> used to "pimp" a Collector to also allow emitting to side outputs.
> >>>
> >>> For WindowedStream we have two proposals: make OutputTag visible on the
> >>> WindowedStream API or make the result type of WindowedStream operations
> >>> more specific to allow a getDroppedDataSideOutput() method. For the
> first
> >>> proposal it would look like this:
> >>>
> >>> final OutputTag<String> lateDataTag = new
> OutputTag<>("side-output-1"){}
> >> ;
> >>>
> >>> DataStream<T> windowedResult = input
> >>>  .keyBy(...)
> >>>  .window(...)
> >>>  .sideOutputLateData(lateDataTag)
> >>>  .apply(...)
> >>>
> >>> DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag);
> >>>
> >>> For the second proposal it would look like this:
> >>>
> >>> WindowedOperator<T> windowedResult = input
> >>>  .keyBy(...)
> >>>  .window(...)
> >>>  .apply(...)
> >>>
> >>> DataStream<IN> lateData = windowedResult.getSideOutput();
> >>>
> >>> Right now, the result of window operations is a
> >>> SingleOutputStreamOperator<T>, same as it is for all DataStream
> >> operations.
> >>> Making the result type more specific, i.e. a WindowedOperator, would
> >> allow
> >>> us to add extra methods there. This would require wrapping a
> >>> SingleOutputStreamOperator and forwarding all the method calls to the
> >>> wrapped operator which can be a bit of a hassle for future changes. The
> >>> first proposal requires additional boilerplate code.
> >>>
> >>> Sorry for the long mail but I think it's necessary to get everyone on
> the
> >>> same page. The question is now: how should we proceed with the proposed
> >> API
> >>> and the old split/select API? I propose to deprecate split/select and
> >> only
> >>> have side outputs, going forward. Of course, I'm a bit biased on this.
> >> ;-)
> >>> If we decide to do this, we also need to decide on what the side output
> >> API
> >>> should look like.
> >>>
> >>> Happy discussing! Feedback very welcome. :-)
> >>>
> >>> Best,
> >>> Aljoscha
> >>>
> >>> [1]
> >>>
> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> >> 13+Side+Outputs+in+Flink
> >>>
> >>
> >
> >
> >
> > --
> >
> > Jamie Grier
> > data Artisans, Director of Applications Engineering
> > @jamiegrier <https://twitter.com/jamiegrier>
> > jamie@data-artisans.com
>
>

Re: [DISCUSS] Side Outputs and Split/Select

Posted by Chen Qin <qi...@gmail.com>.
Hi Jamie,

I think it does make consuming late arriving events more explicit! At cost of 
fix a predefined OutputTag<IN> which user have no control nor definition
an extra UDF which essentially filter out all mainOutputs and only let sideOutput pass (like filterFunction)

Thanks,
Chen

> On Feb 24, 2017, at 1:17 PM, Jamie Grier <ja...@data-artisans.com> wrote:
> 
> I prefer the ProcessFunction and side outputs solution over split() and
> select() which I've never liked primarily due to the lack of type safety
> and it also doesn't really seem to fit with the rest of Flink's API.
> 
> On the late data question I strongly prefer the late data concept being
> explicit in the API.  Could we not also do something like:
> 
> WindowedStream<> windowedStream = input
>  .keyBy(...)
>  .window(...);
> 
> DataStream<> mainOutput = windowedStream
>  .apply(...);
> 
> DataStream<> lateOutput = windowStream
>  .lateStream();
> 
> 
> 
> 
> 
> 
> 
> On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gy...@apache.org> wrote:
> 
>> Hi,
>> 
>> Thanks for the nice proposal, I like the idea of side outputs, and it would
>> make a lot of topologies much simpler.
>> 
>> Regarding the API I think we should come up with a way of making side
>> otuputs accessible from all sort of operators in a similar way. For
>> instance through the RichFunction interface with a special collector that
>> we invalidate when the user should not be collecting to it. (just a quick
>> idea)
>> 
>> I personally wouldn't deprecate the "universal" Split/Select API that can
>> be used on any  DataStream in favor of functionality that is only
>> accessible trhough the process function/ or a few select operators. I think
>> the Split/Select pattern is also very nice and I use it in many different
>> contexts to get efficient multiway filtering (after map/co operators for
>> examples).
>> 
>> Regards,
>> Gyula
>> 
>> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2017. febr. 23.,
>> Cs, 15:42):
>> 
>>> Hi Folks,
>>> Chen and I have been working for a while now on making FLIP-13 (side
>>> outputs) [1] a reality. We think we have a pretty good internal
>>> implementation and also a proposal for an API but now we need to discuss
>>> how we want to go forward with this, especially how we should deal with
>>> split/select which does some of the same things side outputs can do. I'll
>>> first quickly describe what the split/select API looks like, so that
>> we're
>>> all on the same page. Then I'll present the new proposed side output API
>>> and then I'll present new API for getting dropped late data from a
>> windowed
>>> operation, which was the original motivation for adding side outputs.
>>> 
>>> Split/select consists of two API calls: DataStream.split(OutputSelector)
>>> and SplitStream.select(). You can use it like this:
>>> 
>>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
>>> 
>>> final String EVEN_SELECTOR = "even";
>>> final String ODD_SELECTOR = "odd";
>>> 
>>> SplitStream<Integer> split = input.split(
>>>        new OutputSelector<Integer>() {
>>>            @Override
>>>            public Iterable<String> select(Integer value) {
>>>                if (value % 2 == 0) {
>>>                    return Collections.singleton(EVEN_SELECTOR);
>>>                } else {
>>>                    return Collections.singleton(ODD_SELECTOR);
>>>                }
>>>            }
>>>        });
>>> 
>>> DataStream<Integer> evenStream = split.select(EVEN_SELECTOR);
>>> DataStream<Integer> oddStream = split.select(ODD_SELECTOR);
>>> 
>>> The stream is split according to an OutputSelector that returns an
>> Iterable
>>> of Strings. Then you can use select() to get a new stream that only
>>> contains elements with the given selector. Notice how the element type
>> for
>>> all the split streams is the same.
>>> 
>>> The new side output API proposal adds a new type OutputTag<T> and relies
>> on
>>> extending ProcessFunction to allow emitting data to outputs besides the
>>> main output. I think it's best explained with an example as well:
>>> 
>>> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
>>> 
>>> final OutputTag<String> sideOutput1 = new OutputTag<>("side-output-1"){}
>> ;
>>> final OutputTag<Integer> sideOutput2 = new OutputTag<>("side-output-2"){}
>> ;
>>> 
>>> SingleOutputStreamOperator<String> mainOutputStream = input
>>>        .process(new ProcessFunction<Integer, String>() {
>>> 
>>>            @Override
>>>            public void processElement(
>>>                    Integer value,
>>>                    Context ctx,
>>>                    Collector<String> out) throws Exception {
>>> 
>>>                ctx.output(sideOutput1, "WE GOT: " + value);
>>>                ctx.output(sideOutput2, value);
>>>                out.collect("MAIN OUTPUT: " + value);
>>>            }
>>> 
>>>        });
>>> 
>>> DataStream<String> sideOutputStream1 =
>>> mainOutputStream.getSideOutput(sideOutput1);
>>> DataStream<Integer> sideOutputStream2 =
>>> mainOutputStream.getSideOutput(sideOutput2);
>>> 
>>> Notice how the OutputTags are anonymous inner classes, similar to
>> TypeHint.
>>> We need this to be able to analyse the type of the side-output streams.
>>> Also notice, how the types of the side-output streams can be independent
>> of
>>> the main-output stream, also notice how everything is correctly type
>>> checked by the Java Compiler.
>>> 
>>> This change requires making ProcessFunction an abstract base class so
>> that
>>> not every user has to implement the onTimer() method. We would also need
>> to
>>> allow ProcessFunction on a non-keyed stream.
>>> 
>>> Chen also implemented an API based on FlatMapFunction that looks like the
>>> one proposed in the FLIP. This relies on CollectorWrapper, which can be
>>> used to "pimp" a Collector to also allow emitting to side outputs.
>>> 
>>> For WindowedStream we have two proposals: make OutputTag visible on the
>>> WindowedStream API or make the result type of WindowedStream operations
>>> more specific to allow a getDroppedDataSideOutput() method. For the first
>>> proposal it would look like this:
>>> 
>>> final OutputTag<String> lateDataTag = new OutputTag<>("side-output-1"){}
>> ;
>>> 
>>> DataStream<T> windowedResult = input
>>>  .keyBy(...)
>>>  .window(...)
>>>  .sideOutputLateData(lateDataTag)
>>>  .apply(...)
>>> 
>>> DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag);
>>> 
>>> For the second proposal it would look like this:
>>> 
>>> WindowedOperator<T> windowedResult = input
>>>  .keyBy(...)
>>>  .window(...)
>>>  .apply(...)
>>> 
>>> DataStream<IN> lateData = windowedResult.getSideOutput();
>>> 
>>> Right now, the result of window operations is a
>>> SingleOutputStreamOperator<T>, same as it is for all DataStream
>> operations.
>>> Making the result type more specific, i.e. a WindowedOperator, would
>> allow
>>> us to add extra methods there. This would require wrapping a
>>> SingleOutputStreamOperator and forwarding all the method calls to the
>>> wrapped operator which can be a bit of a hassle for future changes. The
>>> first proposal requires additional boilerplate code.
>>> 
>>> Sorry for the long mail but I think it's necessary to get everyone on the
>>> same page. The question is now: how should we proceed with the proposed
>> API
>>> and the old split/select API? I propose to deprecate split/select and
>> only
>>> have side outputs, going forward. Of course, I'm a bit biased on this.
>> ;-)
>>> If we decide to do this, we also need to decide on what the side output
>> API
>>> should look like.
>>> 
>>> Happy discussing! Feedback very welcome. :-)
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>> [1]
>>> 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
>> 13+Side+Outputs+in+Flink
>>> 
>> 
> 
> 
> 
> -- 
> 
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> jamie@data-artisans.com


Re: [DISCUSS] Side Outputs and Split/Select

Posted by Jamie Grier <ja...@data-artisans.com>.
I prefer the ProcessFunction and side outputs solution over split() and
select() which I've never liked primarily due to the lack of type safety
and it also doesn't really seem to fit with the rest of Flink's API.

On the late data question I strongly prefer the late data concept being
explicit in the API.  Could we not also do something like:

WindowedStream<> windowedStream = input
  .keyBy(...)
  .window(...);

DataStream<> mainOutput = windowedStream
  .apply(...);

DataStream<> lateOutput = windowStream
  .lateStream();







On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra <gy...@apache.org> wrote:

> Hi,
>
> Thanks for the nice proposal, I like the idea of side outputs, and it would
> make a lot of topologies much simpler.
>
> Regarding the API I think we should come up with a way of making side
> otuputs accessible from all sort of operators in a similar way. For
> instance through the RichFunction interface with a special collector that
> we invalidate when the user should not be collecting to it. (just a quick
> idea)
>
> I personally wouldn't deprecate the "universal" Split/Select API that can
> be used on any  DataStream in favor of functionality that is only
> accessible trhough the process function/ or a few select operators. I think
> the Split/Select pattern is also very nice and I use it in many different
> contexts to get efficient multiway filtering (after map/co operators for
> examples).
>
> Regards,
> Gyula
>
> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2017. febr. 23.,
> Cs, 15:42):
>
> > Hi Folks,
> > Chen and I have been working for a while now on making FLIP-13 (side
> > outputs) [1] a reality. We think we have a pretty good internal
> > implementation and also a proposal for an API but now we need to discuss
> > how we want to go forward with this, especially how we should deal with
> > split/select which does some of the same things side outputs can do. I'll
> > first quickly describe what the split/select API looks like, so that
> we're
> > all on the same page. Then I'll present the new proposed side output API
> > and then I'll present new API for getting dropped late data from a
> windowed
> > operation, which was the original motivation for adding side outputs.
> >
> > Split/select consists of two API calls: DataStream.split(OutputSelector)
> > and SplitStream.select(). You can use it like this:
> >
> > DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
> >
> > final String EVEN_SELECTOR = "even";
> > final String ODD_SELECTOR = "odd";
> >
> > SplitStream<Integer> split = input.split(
> >         new OutputSelector<Integer>() {
> >             @Override
> >             public Iterable<String> select(Integer value) {
> >                 if (value % 2 == 0) {
> >                     return Collections.singleton(EVEN_SELECTOR);
> >                 } else {
> >                     return Collections.singleton(ODD_SELECTOR);
> >                 }
> >             }
> >         });
> >
> > DataStream<Integer> evenStream = split.select(EVEN_SELECTOR);
> > DataStream<Integer> oddStream = split.select(ODD_SELECTOR);
> >
> > The stream is split according to an OutputSelector that returns an
> Iterable
> > of Strings. Then you can use select() to get a new stream that only
> > contains elements with the given selector. Notice how the element type
> for
> > all the split streams is the same.
> >
> > The new side output API proposal adds a new type OutputTag<T> and relies
> on
> > extending ProcessFunction to allow emitting data to outputs besides the
> > main output. I think it's best explained with an example as well:
> >
> > DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
> >
> > final OutputTag<String> sideOutput1 = new OutputTag<>("side-output-1"){}
> ;
> > final OutputTag<Integer> sideOutput2 = new OutputTag<>("side-output-2"){}
> ;
> >
> > SingleOutputStreamOperator<String> mainOutputStream = input
> >         .process(new ProcessFunction<Integer, String>() {
> >
> >             @Override
> >             public void processElement(
> >                     Integer value,
> >                     Context ctx,
> >                     Collector<String> out) throws Exception {
> >
> >                 ctx.output(sideOutput1, "WE GOT: " + value);
> >                 ctx.output(sideOutput2, value);
> >                 out.collect("MAIN OUTPUT: " + value);
> >             }
> >
> >         });
> >
> > DataStream<String> sideOutputStream1 =
> > mainOutputStream.getSideOutput(sideOutput1);
> > DataStream<Integer> sideOutputStream2 =
> > mainOutputStream.getSideOutput(sideOutput2);
> >
> > Notice how the OutputTags are anonymous inner classes, similar to
> TypeHint.
> > We need this to be able to analyse the type of the side-output streams.
> > Also notice, how the types of the side-output streams can be independent
> of
> > the main-output stream, also notice how everything is correctly type
> > checked by the Java Compiler.
> >
> > This change requires making ProcessFunction an abstract base class so
> that
> > not every user has to implement the onTimer() method. We would also need
> to
> > allow ProcessFunction on a non-keyed stream.
> >
> > Chen also implemented an API based on FlatMapFunction that looks like the
> > one proposed in the FLIP. This relies on CollectorWrapper, which can be
> > used to "pimp" a Collector to also allow emitting to side outputs.
> >
> > For WindowedStream we have two proposals: make OutputTag visible on the
> > WindowedStream API or make the result type of WindowedStream operations
> > more specific to allow a getDroppedDataSideOutput() method. For the first
> > proposal it would look like this:
> >
> > final OutputTag<String> lateDataTag = new OutputTag<>("side-output-1"){}
> ;
> >
> > DataStream<T> windowedResult = input
> >   .keyBy(...)
> >   .window(...)
> >   .sideOutputLateData(lateDataTag)
> >   .apply(...)
> >
> > DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag);
> >
> > For the second proposal it would look like this:
> >
> > WindowedOperator<T> windowedResult = input
> >   .keyBy(...)
> >   .window(...)
> >   .apply(...)
> >
> > DataStream<IN> lateData = windowedResult.getSideOutput();
> >
> > Right now, the result of window operations is a
> > SingleOutputStreamOperator<T>, same as it is for all DataStream
> operations.
> > Making the result type more specific, i.e. a WindowedOperator, would
> allow
> > us to add extra methods there. This would require wrapping a
> > SingleOutputStreamOperator and forwarding all the method calls to the
> > wrapped operator which can be a bit of a hassle for future changes. The
> > first proposal requires additional boilerplate code.
> >
> > Sorry for the long mail but I think it's necessary to get everyone on the
> > same page. The question is now: how should we proceed with the proposed
> API
> > and the old split/select API? I propose to deprecate split/select and
> only
> > have side outputs, going forward. Of course, I'm a bit biased on this.
> ;-)
> > If we decide to do this, we also need to decide on what the side output
> API
> > should look like.
> >
> > Happy discussing! Feedback very welcome. :-)
> >
> > Best,
> > Aljoscha
> >
> > [1]
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 13+Side+Outputs+in+Flink
> >
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com

Re: [DISCUSS] Side Outputs and Split/Select

Posted by Gyula Fóra <gy...@apache.org>.
Hi,

Thanks for the nice proposal, I like the idea of side outputs, and it would
make a lot of topologies much simpler.

Regarding the API I think we should come up with a way of making side
otuputs accessible from all sort of operators in a similar way. For
instance through the RichFunction interface with a special collector that
we invalidate when the user should not be collecting to it. (just a quick
idea)

I personally wouldn't deprecate the "universal" Split/Select API that can
be used on any  DataStream in favor of functionality that is only
accessible trhough the process function/ or a few select operators. I think
the Split/Select pattern is also very nice and I use it in many different
contexts to get efficient multiway filtering (after map/co operators for
examples).

Regards,
Gyula

Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2017. febr. 23.,
Cs, 15:42):

> Hi Folks,
> Chen and I have been working for a while now on making FLIP-13 (side
> outputs) [1] a reality. We think we have a pretty good internal
> implementation and also a proposal for an API but now we need to discuss
> how we want to go forward with this, especially how we should deal with
> split/select which does some of the same things side outputs can do. I'll
> first quickly describe what the split/select API looks like, so that we're
> all on the same page. Then I'll present the new proposed side output API
> and then I'll present new API for getting dropped late data from a windowed
> operation, which was the original motivation for adding side outputs.
>
> Split/select consists of two API calls: DataStream.split(OutputSelector)
> and SplitStream.select(). You can use it like this:
>
> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
>
> final String EVEN_SELECTOR = "even";
> final String ODD_SELECTOR = "odd";
>
> SplitStream<Integer> split = input.split(
>         new OutputSelector<Integer>() {
>             @Override
>             public Iterable<String> select(Integer value) {
>                 if (value % 2 == 0) {
>                     return Collections.singleton(EVEN_SELECTOR);
>                 } else {
>                     return Collections.singleton(ODD_SELECTOR);
>                 }
>             }
>         });
>
> DataStream<Integer> evenStream = split.select(EVEN_SELECTOR);
> DataStream<Integer> oddStream = split.select(ODD_SELECTOR);
>
> The stream is split according to an OutputSelector that returns an Iterable
> of Strings. Then you can use select() to get a new stream that only
> contains elements with the given selector. Notice how the element type for
> all the split streams is the same.
>
> The new side output API proposal adds a new type OutputTag<T> and relies on
> extending ProcessFunction to allow emitting data to outputs besides the
> main output. I think it's best explained with an example as well:
>
> DataStreamSource<Integer> input = env.fromElements(1, 2, 3);
>
> final OutputTag<String> sideOutput1 = new OutputTag<>("side-output-1"){};
> final OutputTag<Integer> sideOutput2 = new OutputTag<>("side-output-2"){};
>
> SingleOutputStreamOperator<String> mainOutputStream = input
>         .process(new ProcessFunction<Integer, String>() {
>
>             @Override
>             public void processElement(
>                     Integer value,
>                     Context ctx,
>                     Collector<String> out) throws Exception {
>
>                 ctx.output(sideOutput1, "WE GOT: " + value);
>                 ctx.output(sideOutput2, value);
>                 out.collect("MAIN OUTPUT: " + value);
>             }
>
>         });
>
> DataStream<String> sideOutputStream1 =
> mainOutputStream.getSideOutput(sideOutput1);
> DataStream<Integer> sideOutputStream2 =
> mainOutputStream.getSideOutput(sideOutput2);
>
> Notice how the OutputTags are anonymous inner classes, similar to TypeHint.
> We need this to be able to analyse the type of the side-output streams.
> Also notice, how the types of the side-output streams can be independent of
> the main-output stream, also notice how everything is correctly type
> checked by the Java Compiler.
>
> This change requires making ProcessFunction an abstract base class so that
> not every user has to implement the onTimer() method. We would also need to
> allow ProcessFunction on a non-keyed stream.
>
> Chen also implemented an API based on FlatMapFunction that looks like the
> one proposed in the FLIP. This relies on CollectorWrapper, which can be
> used to "pimp" a Collector to also allow emitting to side outputs.
>
> For WindowedStream we have two proposals: make OutputTag visible on the
> WindowedStream API or make the result type of WindowedStream operations
> more specific to allow a getDroppedDataSideOutput() method. For the first
> proposal it would look like this:
>
> final OutputTag<String> lateDataTag = new OutputTag<>("side-output-1"){};
>
> DataStream<T> windowedResult = input
>   .keyBy(...)
>   .window(...)
>   .sideOutputLateData(lateDataTag)
>   .apply(...)
>
> DataStream<IN> lateData = windowedResult.getSideOutput(lateDataTag);
>
> For the second proposal it would look like this:
>
> WindowedOperator<T> windowedResult = input
>   .keyBy(...)
>   .window(...)
>   .apply(...)
>
> DataStream<IN> lateData = windowedResult.getSideOutput();
>
> Right now, the result of window operations is a
> SingleOutputStreamOperator<T>, same as it is for all DataStream operations.
> Making the result type more specific, i.e. a WindowedOperator, would allow
> us to add extra methods there. This would require wrapping a
> SingleOutputStreamOperator and forwarding all the method calls to the
> wrapped operator which can be a bit of a hassle for future changes. The
> first proposal requires additional boilerplate code.
>
> Sorry for the long mail but I think it's necessary to get everyone on the
> same page. The question is now: how should we proceed with the proposed API
> and the old split/select API? I propose to deprecate split/select and only
> have side outputs, going forward. Of course, I'm a bit biased on this. ;-)
> If we decide to do this, we also need to decide on what the side output API
> should look like.
>
> Happy discussing! Feedback very welcome. :-)
>
> Best,
> Aljoscha
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-13+Side+Outputs+in+Flink
>