You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Alexey Demin <di...@gmail.com> on 2016/11/28 06:33:31 UTC

Flink runner. Optimization for sideOutput with tags

Hi

If we try use sideOutput with TupleTag and flink config enableObjectReuse
then we have stacktrace

        at
org.apache.beam.sdk.transforms.DoFnAdapters$SimpleDoFnAdapter.processElement(DoFnAdapters.java:234)
        at
org.apache.beam.runners.core.SimpleOldDoFnRunner.invokeProcessElement(SimpleOldDoFnRunner.java:118)
        at
org.apache.beam.runners.core.SimpleOldDoFnRunner.processElement(SimpleOldDoFnRunner.java:104)
        at
org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElement(PushbackSideInputDoFnRunner.java:106)
        at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:265)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:330)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:315)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
        at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at
*org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$ParDoBoundMultiStreamingTranslator$1.flatMap(FlinkStreamingTransformTranslators.java:570)*
        at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$ParDoBoundMultiStreamingTranslator$1.flatMap(FlinkStreamingTransformTranslators.java:566)
        at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:330)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:315)
*        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:427)*
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:415)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
        at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$MultiOutputOutputManagerFactory$1.output(DoFnOperator.java:459)
        at
org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnContext.outputWindowedValue(SimpleOldDoFnRunner.java:270)
        at
org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnProcessContext.output(SimpleOldDoFnRunner.java:412)
        at
org.apache.beam.sdk.transforms.DoFnAdapters$ProcessContextAdapter.output(DoFnAdapters.java:406)

Most interesting parts it's CopyingBroadcastingOutputCollector and
1.flatMap(FlinkStreamingTransformTranslators.java:570)

For flat map:

        @SuppressWarnings("unchecked")
        DataStream filtered =
            unionOutputStream.flatMap(new FlatMapFunction<RawUnionValue,
Object>() {
              @Override
              public void flatMap(RawUnionValue value, Collector<Object>
out) throws Exception {
                if (value.getUnionTag() == outputTag) {
                  out.collect(value.getValue());
                }
              }
            }).returns(outputTypeInfo);

as result we always serialize-deserialize messages even if we have only 1
collector for each type of tags

CopyingBroadcastingOutputCollector also have path when we have only one
collector on output

             public void collect(StreamRecord<T> record) {
                        for (int i = 0; i < outputs.length - 1; i++) {
                                Output<StreamRecord<T>> output = outputs[i];
                                StreamRecord<T> shallowCopy =
record.copy(record.getValue());
                                output.collect(shallowCopy);
                        }

                        // don't copy for the last output
                        outputs[outputs.length - 1].collect(record);
                }

Maybe more efficiently do tag filtering before
CopyingBroadcastingOutputCollector and avoid unnecessary work with to copy
data ?


Thanks
Alexey Diomin

Re: Flink runner. Optimization for sideOutput with tags

Posted by Aljoscha Krettek <al...@apache.org>.
I'm having a look at your PRs now. I think the change is good, and it's
actually quite simple too.

Thanks for looking into this!

On Mon, 5 Dec 2016 at 05:48 Alexey Demin <di...@gmail.com> wrote:

> Aljoscha
>
> I mistaken with flink runtime =)
>
> What do you think about some modification FlinkStreamingTransformTransla
> tors:
>
> move split out of for-loop:
>
> SplitStream<RawUnionValue> splitStream = unionOutputStream.split(new
> OutputSelector<RawUnionValue>() {
>         @Override
>         public Iterable<String> select(RawUnionValue value) {
>           return Lists.newArrayList(String.valueOf(value.getUnionTag()));
>         }
>       });
>
> and change filtered to
>
> DataStream filtered = splitStream.select(String.valueOf(outputTag))
>                         .flatMap(new FlatMapFunction<RawUnionValue,
> Object>() {
>                             @Override
>                             public void flatMap(RawUnionValue value,
> Collector<Object> out) throws Exception {
>                                 out.collect(value.getValue());
>                             }
>                           }).returns(outputTypeInfo);
>
> In this implementations we always transfer data only for necessary output
> without broadcast every type by all output.
>
> p.s. I know this code not production ready, only idea for discuss.
> but for people who use side output only for alerting it's can reduce cpu
> usage (serialization will apply only on targeted value, not for all
> elements for every outputs)
>
> Thanks,
> Alexey Diomin
>
>
> 2016-12-04 23:57 GMT+04:00 Alexey Demin <di...@gmail.com>:
>
> > Hi
> >
> > very simple example
> > https://gist.github.com/xhumanoid/287af191314d5d867acf509129bd4931
> >
> > Sometime we need meta-information about processing element
> >
> > If i correctly understood code in FlinkStreamingTransformTranslators line
> > 557:
> > main problem not in translators, but in flink runtime, which don't know
> > about tags and simple does broadcast when have 2 output from one
> > transformation
> >
> > Correct me if I mistaken
> >
> >
> > >> this is a bit of a dangerous setting
> >
> > I know about dangerous with object-reuse, but we never use object after
> > collect.
> > In some cases we need more performance and serialization on every
> > transformation very expensive,
> > but try merge all business logic in one DoFn it to make processing
> > unsupportable.
> >
> > >> I think your stack trace is not complete, at least I can't seem to see
> > the root exception.
> >
> > We made this stacktrace on live system with jstack. It's not exception.
> >
> > Thanks,
> > Alexey Diomin
> >
> >
> > 2016-11-29 21:33 GMT+04:00 Aljoscha Krettek <al...@apache.org>:
> >
> >> Hi Alexey,
> >> I think it should be possible to optimise this particular transformation
> >> by
> >> using a split/select pattern in Flink. (See split and select here:
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
> >> dev/datastream_api.html#datastream-transformations).
> >> The current implementation is not very optimised, my main goal was to
> make
> >> all features of Beam work before going into individual optimisations.
> >>
> >> About object-reuse in Flink Streaming: this is a bit of a dangerous
> >> setting
> >> and can lead to unexpected results with certain patterns. I think your
> >> stack trace is not complete, at least I can't seem to see the root
> >> exception.
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Mon, 28 Nov 2016 at 07:33 Alexey Demin <di...@gmail.com> wrote:
> >>
> >> > Hi
> >> >
> >> > If we try use sideOutput with TupleTag and flink config
> >> enableObjectReuse
> >> > then we have stacktrace
> >> >
> >> >         at
> >> >
> >> > org.apache.beam.sdk.transforms.DoFnAdapters$SimpleDoFnAdapte
> >> r.processElement(DoFnAdapters.java:234)
> >> >         at
> >> >
> >> > org.apache.beam.runners.core.SimpleOldDoFnRunner.invokeProce
> >> ssElement(SimpleOldDoFnRunner.java:118)
> >> >         at
> >> >
> >> > org.apache.beam.runners.core.SimpleOldDoFnRunner.processElem
> >> ent(SimpleOldDoFnRunner.java:104)
> >> >         at
> >> >
> >> > org.apache.beam.runners.core.PushbackSideInputDoFnRunner.pro
> >> cessElement(PushbackSideInputDoFnRunner.java:106)
> >> >         at
> >> >
> >> > org.apache.beam.runners.flink.translation.wrappers.streaming
> >> .DoFnOperator.processElement(DoFnOperator.java:265)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
> >> ingOutput.collect(OperatorChain.java:330)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
> >> ingOutput.collect(OperatorChain.java:315)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.api.operators.AbstractStreamOpera
> >> tor$CountingOutput.collect(AbstractStreamOperator.java:346)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.api.operators.AbstractStreamOpera
> >> tor$CountingOutput.collect(AbstractStreamOperator.java:329)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.api.operators.TimestampedCollecto
> >> r.collect(TimestampedCollector.java:51)
> >> >         at
> >> >
> >> > *org.apache.beam.runners.flink.translation.FlinkStreamingTra
> >> nsformTranslators$ParDoBoundMultiStreamingTranslator$1.
> >> flatMap(FlinkStreamingTransformTranslators.java:570)*
> >> >         at
> >> >
> >> > org.apache.beam.runners.flink.translation.FlinkStreamingTran
> >> sformTranslators$ParDoBoundMultiStreamingTranslator$1.
> >> flatMap(FlinkStreamingTransformTranslators.java:566)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.api.operators.StreamFlatMap.proce
> >> ssElement(StreamFlatMap.java:48)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
> >> ingOutput.collect(OperatorChain.java:330)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
> >> ingOutput.collect(OperatorChain.java:315)
> >> > *        at
> >> >
> >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
> >> ngBroadcastingOutputCollector.collect(OperatorChain.java:427)*
> >> >         at
> >> >
> >> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
> >> ngBroadcastingOutputCollector.collect(OperatorChain.java:415)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.api.operators.AbstractStreamOpera
> >> tor$CountingOutput.collect(AbstractStreamOperator.java:346)
> >> >         at
> >> >
> >> > org.apache.flink.streaming.api.operators.AbstractStreamOpera
> >> tor$CountingOutput.collect(AbstractStreamOperator.java:329)
> >> >         at
> >> >
> >> > org.apache.beam.runners.flink.translation.wrappers.streaming
> >> .DoFnOperator$MultiOutputOutputManagerFactory$1.output(
> >> DoFnOperator.java:459)
> >> >         at
> >> >
> >> > org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnContext
> >> .outputWindowedValue(SimpleOldDoFnRunner.java:270)
> >> >         at
> >> >
> >> > org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnProcess
> >> Context.output(SimpleOldDoFnRunner.java:412)
> >> >         at
> >> >
> >> > org.apache.beam.sdk.transforms.DoFnAdapters$ProcessContextAd
> >> apter.output(DoFnAdapters.java:406)
> >> >
> >> > Most interesting parts it's CopyingBroadcastingOutputCollector and
> >> > 1.flatMap(FlinkStreamingTransformTranslators.java:570)
> >> >
> >> > For flat map:
> >> >
> >> >         @SuppressWarnings("unchecked")
> >> >         DataStream filtered =
> >> >             unionOutputStream.flatMap(new
> FlatMapFunction<RawUnionValue,
> >> > Object>() {
> >> >               @Override
> >> >               public void flatMap(RawUnionValue value,
> Collector<Object>
> >> > out) throws Exception {
> >> >                 if (value.getUnionTag() == outputTag) {
> >> >                   out.collect(value.getValue());
> >> >                 }
> >> >               }
> >> >             }).returns(outputTypeInfo);
> >> >
> >> > as result we always serialize-deserialize messages even if we have
> only
> >> 1
> >> > collector for each type of tags
> >> >
> >> > CopyingBroadcastingOutputCollector also have path when we have only
> one
> >> > collector on output
> >> >
> >> >              public void collect(StreamRecord<T> record) {
> >> >                         for (int i = 0; i < outputs.length - 1; i++) {
> >> >                                 Output<StreamRecord<T>> output =
> >> > outputs[i];
> >> >                                 StreamRecord<T> shallowCopy =
> >> > record.copy(record.getValue());
> >> >                                 output.collect(shallowCopy);
> >> >                         }
> >> >
> >> >                         // don't copy for the last output
> >> >                         outputs[outputs.length - 1].collect(record);
> >> >                 }
> >> >
> >> > Maybe more efficiently do tag filtering before
> >> > CopyingBroadcastingOutputCollector and avoid unnecessary work with to
> >> copy
> >> > data ?
> >> >
> >> >
> >> > Thanks
> >> > Alexey Diomin
> >> >
> >>
> >
> >
>

Re: Flink runner. Optimization for sideOutput with tags

Posted by Alexey Demin <di...@gmail.com>.
Aljoscha

I mistaken with flink runtime =)

What do you think about some modification FlinkStreamingTransformTransla
tors:

move split out of for-loop:

SplitStream<RawUnionValue> splitStream = unionOutputStream.split(new
OutputSelector<RawUnionValue>() {
        @Override
        public Iterable<String> select(RawUnionValue value) {
          return Lists.newArrayList(String.valueOf(value.getUnionTag()));
        }
      });

and change filtered to

DataStream filtered = splitStream.select(String.valueOf(outputTag))
                        .flatMap(new FlatMapFunction<RawUnionValue,
Object>() {
                            @Override
                            public void flatMap(RawUnionValue value,
Collector<Object> out) throws Exception {
                                out.collect(value.getValue());
                            }
                          }).returns(outputTypeInfo);

In this implementations we always transfer data only for necessary output
without broadcast every type by all output.

p.s. I know this code not production ready, only idea for discuss.
but for people who use side output only for alerting it's can reduce cpu
usage (serialization will apply only on targeted value, not for all
elements for every outputs)

Thanks,
Alexey Diomin


2016-12-04 23:57 GMT+04:00 Alexey Demin <di...@gmail.com>:

> Hi
>
> very simple example
> https://gist.github.com/xhumanoid/287af191314d5d867acf509129bd4931
>
> Sometime we need meta-information about processing element
>
> If i correctly understood code in FlinkStreamingTransformTranslators line
> 557:
> main problem not in translators, but in flink runtime, which don't know
> about tags and simple does broadcast when have 2 output from one
> transformation
>
> Correct me if I mistaken
>
>
> >> this is a bit of a dangerous setting
>
> I know about dangerous with object-reuse, but we never use object after
> collect.
> In some cases we need more performance and serialization on every
> transformation very expensive,
> but try merge all business logic in one DoFn it to make processing
> unsupportable.
>
> >> I think your stack trace is not complete, at least I can't seem to see
> the root exception.
>
> We made this stacktrace on live system with jstack. It's not exception.
>
> Thanks,
> Alexey Diomin
>
>
> 2016-11-29 21:33 GMT+04:00 Aljoscha Krettek <al...@apache.org>:
>
>> Hi Alexey,
>> I think it should be possible to optimise this particular transformation
>> by
>> using a split/select pattern in Flink. (See split and select here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> dev/datastream_api.html#datastream-transformations).
>> The current implementation is not very optimised, my main goal was to make
>> all features of Beam work before going into individual optimisations.
>>
>> About object-reuse in Flink Streaming: this is a bit of a dangerous
>> setting
>> and can lead to unexpected results with certain patterns. I think your
>> stack trace is not complete, at least I can't seem to see the root
>> exception.
>>
>> Cheers,
>> Aljoscha
>>
>> On Mon, 28 Nov 2016 at 07:33 Alexey Demin <di...@gmail.com> wrote:
>>
>> > Hi
>> >
>> > If we try use sideOutput with TupleTag and flink config
>> enableObjectReuse
>> > then we have stacktrace
>> >
>> >         at
>> >
>> > org.apache.beam.sdk.transforms.DoFnAdapters$SimpleDoFnAdapte
>> r.processElement(DoFnAdapters.java:234)
>> >         at
>> >
>> > org.apache.beam.runners.core.SimpleOldDoFnRunner.invokeProce
>> ssElement(SimpleOldDoFnRunner.java:118)
>> >         at
>> >
>> > org.apache.beam.runners.core.SimpleOldDoFnRunner.processElem
>> ent(SimpleOldDoFnRunner.java:104)
>> >         at
>> >
>> > org.apache.beam.runners.core.PushbackSideInputDoFnRunner.pro
>> cessElement(PushbackSideInputDoFnRunner.java:106)
>> >         at
>> >
>> > org.apache.beam.runners.flink.translation.wrappers.streaming
>> .DoFnOperator.processElement(DoFnOperator.java:265)
>> >         at
>> >
>> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
>> ingOutput.collect(OperatorChain.java:330)
>> >         at
>> >
>> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
>> ingOutput.collect(OperatorChain.java:315)
>> >         at
>> >
>> > org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:346)
>> >         at
>> >
>> > org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:329)
>> >         at
>> >
>> > org.apache.flink.streaming.api.operators.TimestampedCollecto
>> r.collect(TimestampedCollector.java:51)
>> >         at
>> >
>> > *org.apache.beam.runners.flink.translation.FlinkStreamingTra
>> nsformTranslators$ParDoBoundMultiStreamingTranslator$1.
>> flatMap(FlinkStreamingTransformTranslators.java:570)*
>> >         at
>> >
>> > org.apache.beam.runners.flink.translation.FlinkStreamingTran
>> sformTranslators$ParDoBoundMultiStreamingTranslator$1.
>> flatMap(FlinkStreamingTransformTranslators.java:566)
>> >         at
>> >
>> > org.apache.flink.streaming.api.operators.StreamFlatMap.proce
>> ssElement(StreamFlatMap.java:48)
>> >         at
>> >
>> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
>> ingOutput.collect(OperatorChain.java:330)
>> >         at
>> >
>> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Chain
>> ingOutput.collect(OperatorChain.java:315)
>> > *        at
>> >
>> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngBroadcastingOutputCollector.collect(OperatorChain.java:427)*
>> >         at
>> >
>> > org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngBroadcastingOutputCollector.collect(OperatorChain.java:415)
>> >         at
>> >
>> > org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:346)
>> >         at
>> >
>> > org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:329)
>> >         at
>> >
>> > org.apache.beam.runners.flink.translation.wrappers.streaming
>> .DoFnOperator$MultiOutputOutputManagerFactory$1.output(
>> DoFnOperator.java:459)
>> >         at
>> >
>> > org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnContext
>> .outputWindowedValue(SimpleOldDoFnRunner.java:270)
>> >         at
>> >
>> > org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnProcess
>> Context.output(SimpleOldDoFnRunner.java:412)
>> >         at
>> >
>> > org.apache.beam.sdk.transforms.DoFnAdapters$ProcessContextAd
>> apter.output(DoFnAdapters.java:406)
>> >
>> > Most interesting parts it's CopyingBroadcastingOutputCollector and
>> > 1.flatMap(FlinkStreamingTransformTranslators.java:570)
>> >
>> > For flat map:
>> >
>> >         @SuppressWarnings("unchecked")
>> >         DataStream filtered =
>> >             unionOutputStream.flatMap(new FlatMapFunction<RawUnionValue,
>> > Object>() {
>> >               @Override
>> >               public void flatMap(RawUnionValue value, Collector<Object>
>> > out) throws Exception {
>> >                 if (value.getUnionTag() == outputTag) {
>> >                   out.collect(value.getValue());
>> >                 }
>> >               }
>> >             }).returns(outputTypeInfo);
>> >
>> > as result we always serialize-deserialize messages even if we have only
>> 1
>> > collector for each type of tags
>> >
>> > CopyingBroadcastingOutputCollector also have path when we have only one
>> > collector on output
>> >
>> >              public void collect(StreamRecord<T> record) {
>> >                         for (int i = 0; i < outputs.length - 1; i++) {
>> >                                 Output<StreamRecord<T>> output =
>> > outputs[i];
>> >                                 StreamRecord<T> shallowCopy =
>> > record.copy(record.getValue());
>> >                                 output.collect(shallowCopy);
>> >                         }
>> >
>> >                         // don't copy for the last output
>> >                         outputs[outputs.length - 1].collect(record);
>> >                 }
>> >
>> > Maybe more efficiently do tag filtering before
>> > CopyingBroadcastingOutputCollector and avoid unnecessary work with to
>> copy
>> > data ?
>> >
>> >
>> > Thanks
>> > Alexey Diomin
>> >
>>
>
>

Re: Flink runner. Optimization for sideOutput with tags

Posted by Alexey Demin <di...@gmail.com>.
Hi

very simple example
https://gist.github.com/xhumanoid/287af191314d5d867acf509129bd4931

Sometime we need meta-information about processing element

If i correctly understood code in FlinkStreamingTransformTranslators line
557:
main problem not in translators, but in flink runtime, which don't know
about tags and simple does broadcast when have 2 output from one
transformation

Correct me if I mistaken


>> this is a bit of a dangerous setting

I know about dangerous with object-reuse, but we never use object after
collect.
In some cases we need more performance and serialization on every
transformation very expensive,
but try merge all business logic in one DoFn it to make processing
unsupportable.

>> I think your stack trace is not complete, at least I can't seem to see
the root exception.

We made this stacktrace on live system with jstack. It's not exception.

Thanks,
Alexey Diomin


2016-11-29 21:33 GMT+04:00 Aljoscha Krettek <al...@apache.org>:

> Hi Alexey,
> I think it should be possible to optimise this particular transformation by
> using a split/select pattern in Flink. (See split and select here:
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/datastream_api.html#datastream-transformations).
> The current implementation is not very optimised, my main goal was to make
> all features of Beam work before going into individual optimisations.
>
> About object-reuse in Flink Streaming: this is a bit of a dangerous setting
> and can lead to unexpected results with certain patterns. I think your
> stack trace is not complete, at least I can't seem to see the root
> exception.
>
> Cheers,
> Aljoscha
>
> On Mon, 28 Nov 2016 at 07:33 Alexey Demin <di...@gmail.com> wrote:
>
> > Hi
> >
> > If we try use sideOutput with TupleTag and flink config enableObjectReuse
> > then we have stacktrace
> >
> >         at
> >
> > org.apache.beam.sdk.transforms.DoFnAdapters$SimpleDoFnAdapter.
> processElement(DoFnAdapters.java:234)
> >         at
> >
> > org.apache.beam.runners.core.SimpleOldDoFnRunner.invokeProcessElement(
> SimpleOldDoFnRunner.java:118)
> >         at
> >
> > org.apache.beam.runners.core.SimpleOldDoFnRunner.processElement(
> SimpleOldDoFnRunner.java:104)
> >         at
> >
> > org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElement(
> PushbackSideInputDoFnRunner.java:106)
> >         at
> >
> > org.apache.beam.runners.flink.translation.wrappers.
> streaming.DoFnOperator.processElement(DoFnOperator.java:265)
> >         at
> >
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.collect(OperatorChain.java:330)
> >         at
> >
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.collect(OperatorChain.java:315)
> >         at
> >
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:346)
> >         at
> >
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:329)
> >         at
> >
> > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> >         at
> >
> > *org.apache.beam.runners.flink.translation.
> FlinkStreamingTransformTranslators$ParDoBoundMultiStreamingTransl
> ator$1.flatMap(FlinkStreamingTransformTranslators.java:570)*
> >         at
> >
> > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTransla
> tors$ParDoBoundMultiStreamingTranslator$1.flatMap(
> FlinkStreamingTransformTranslators.java:566)
> >         at
> >
> > org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.java:48)
> >         at
> >
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.collect(OperatorChain.java:330)
> >         at
> >
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> ChainingOutput.collect(OperatorChain.java:315)
> > *        at
> >
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingBroadcastingOutputCollector.collect(OperatorChain.java:427)*
> >         at
> >
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingBroadcastingOutputCollector.collect(OperatorChain.java:415)
> >         at
> >
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:346)
> >         at
> >
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:329)
> >         at
> >
> > org.apache.beam.runners.flink.translation.wrappers.
> streaming.DoFnOperator$MultiOutputOutputManagerFactor
> y$1.output(DoFnOperator.java:459)
> >         at
> >
> > org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnContext.
> outputWindowedValue(SimpleOldDoFnRunner.java:270)
> >         at
> >
> > org.apache.beam.runners.core.SimpleOldDoFnRunner$
> DoFnProcessContext.output(SimpleOldDoFnRunner.java:412)
> >         at
> >
> > org.apache.beam.sdk.transforms.DoFnAdapters$
> ProcessContextAdapter.output(DoFnAdapters.java:406)
> >
> > Most interesting parts it's CopyingBroadcastingOutputCollector and
> > 1.flatMap(FlinkStreamingTransformTranslators.java:570)
> >
> > For flat map:
> >
> >         @SuppressWarnings("unchecked")
> >         DataStream filtered =
> >             unionOutputStream.flatMap(new FlatMapFunction<RawUnionValue,
> > Object>() {
> >               @Override
> >               public void flatMap(RawUnionValue value, Collector<Object>
> > out) throws Exception {
> >                 if (value.getUnionTag() == outputTag) {
> >                   out.collect(value.getValue());
> >                 }
> >               }
> >             }).returns(outputTypeInfo);
> >
> > as result we always serialize-deserialize messages even if we have only 1
> > collector for each type of tags
> >
> > CopyingBroadcastingOutputCollector also have path when we have only one
> > collector on output
> >
> >              public void collect(StreamRecord<T> record) {
> >                         for (int i = 0; i < outputs.length - 1; i++) {
> >                                 Output<StreamRecord<T>> output =
> > outputs[i];
> >                                 StreamRecord<T> shallowCopy =
> > record.copy(record.getValue());
> >                                 output.collect(shallowCopy);
> >                         }
> >
> >                         // don't copy for the last output
> >                         outputs[outputs.length - 1].collect(record);
> >                 }
> >
> > Maybe more efficiently do tag filtering before
> > CopyingBroadcastingOutputCollector and avoid unnecessary work with to
> copy
> > data ?
> >
> >
> > Thanks
> > Alexey Diomin
> >
>

Re: Flink runner. Optimization for sideOutput with tags

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Alexey,
I think it should be possible to optimise this particular transformation by
using a split/select pattern in Flink. (See split and select here:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html#datastream-transformations).
The current implementation is not very optimised, my main goal was to make
all features of Beam work before going into individual optimisations.

About object-reuse in Flink Streaming: this is a bit of a dangerous setting
and can lead to unexpected results with certain patterns. I think your
stack trace is not complete, at least I can't seem to see the root
exception.

Cheers,
Aljoscha

On Mon, 28 Nov 2016 at 07:33 Alexey Demin <di...@gmail.com> wrote:

> Hi
>
> If we try use sideOutput with TupleTag and flink config enableObjectReuse
> then we have stacktrace
>
>         at
>
> org.apache.beam.sdk.transforms.DoFnAdapters$SimpleDoFnAdapter.processElement(DoFnAdapters.java:234)
>         at
>
> org.apache.beam.runners.core.SimpleOldDoFnRunner.invokeProcessElement(SimpleOldDoFnRunner.java:118)
>         at
>
> org.apache.beam.runners.core.SimpleOldDoFnRunner.processElement(SimpleOldDoFnRunner.java:104)
>         at
>
> org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElement(PushbackSideInputDoFnRunner.java:106)
>         at
>
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:265)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:330)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:315)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
>         at
>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>         at
>
> *org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$ParDoBoundMultiStreamingTranslator$1.flatMap(FlinkStreamingTransformTranslators.java:570)*
>         at
>
> org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$ParDoBoundMultiStreamingTranslator$1.flatMap(FlinkStreamingTransformTranslators.java:566)
>         at
>
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:330)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:315)
> *        at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:427)*
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:415)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
>         at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
>         at
>
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$MultiOutputOutputManagerFactory$1.output(DoFnOperator.java:459)
>         at
>
> org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnContext.outputWindowedValue(SimpleOldDoFnRunner.java:270)
>         at
>
> org.apache.beam.runners.core.SimpleOldDoFnRunner$DoFnProcessContext.output(SimpleOldDoFnRunner.java:412)
>         at
>
> org.apache.beam.sdk.transforms.DoFnAdapters$ProcessContextAdapter.output(DoFnAdapters.java:406)
>
> Most interesting parts it's CopyingBroadcastingOutputCollector and
> 1.flatMap(FlinkStreamingTransformTranslators.java:570)
>
> For flat map:
>
>         @SuppressWarnings("unchecked")
>         DataStream filtered =
>             unionOutputStream.flatMap(new FlatMapFunction<RawUnionValue,
> Object>() {
>               @Override
>               public void flatMap(RawUnionValue value, Collector<Object>
> out) throws Exception {
>                 if (value.getUnionTag() == outputTag) {
>                   out.collect(value.getValue());
>                 }
>               }
>             }).returns(outputTypeInfo);
>
> as result we always serialize-deserialize messages even if we have only 1
> collector for each type of tags
>
> CopyingBroadcastingOutputCollector also have path when we have only one
> collector on output
>
>              public void collect(StreamRecord<T> record) {
>                         for (int i = 0; i < outputs.length - 1; i++) {
>                                 Output<StreamRecord<T>> output =
> outputs[i];
>                                 StreamRecord<T> shallowCopy =
> record.copy(record.getValue());
>                                 output.collect(shallowCopy);
>                         }
>
>                         // don't copy for the last output
>                         outputs[outputs.length - 1].collect(record);
>                 }
>
> Maybe more efficiently do tag filtering before
> CopyingBroadcastingOutputCollector and avoid unnecessary work with to copy
> data ?
>
>
> Thanks
> Alexey Diomin
>