You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jozef Vilcek <jo...@gmail.com> on 2019/05/01 11:24:15 UTC

Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles <ke...@apache.org> wrote:

>
>
> On Tue, Apr 30, 2019, 07:05 Reuven Lax <re...@google.com> wrote:
>
>> In that case, Robert's point is quite valid. The old Flink runner I
>> believe had no knowledge of fusion, which was known to make it extremely
>> slow. A lot of work went into making the portable runner fusion aware, so
>> we don't need to round trip through coders on every ParDo.
>>
>
> The old Flink runner got fusion for free, since Flink does it. The new
> fusion in portability is because fusing the runner side of portability
> steps does not achieve real fusion
>

Aha, I see. So the feature in Flink is operator chaining and Flink per
default initiate copy of input elements. In case of Beam coders copy seems
to be more noticable than native Flink.
So do I get it right that in portable runner scenario, you do similar
chaining via this "fusion of stages"? Curious here... how is it different
from chaining so runner can be sure that not doing copy is "safe" with
respect to user defined functions and their behaviour over inputs?


>
>> Reuven
>>
>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek <jo...@gmail.com>
>> wrote:
>>
>>> It was not a portable Flink runner.
>>>
>>> Thanks all for the thoughts, I will create JIRAs, as suggested, with my
>>> findings and send them out
>>>
>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Jozef did you use the portable Flink runner or the old one?
>>>>
>>>> Reuven
>>>>
>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> Thanks for starting this investigation. As mentioned, most of the work
>>>>> to date has been on feature parity, not performance parity, but we're
>>>>> at the point that the latter should be tackled as well. Even if there
>>>>> is a slight overhead (and there's talk about integrating more deeply
>>>>> with the Flume DAG that could elide even that) I'd expect it should be
>>>>> nowhere near the 3x that you're seeing. Aside from the timer issue,
>>>>> sounds like the cloning via coders is is a huge drag that needs to be
>>>>> addressed. I wonder if this is one of those cases where using the
>>>>> portability framework could be a performance win (specifically, no
>>>>> cloning would happen between operators of fused stages, and the
>>>>> cloning between operators could be on the raw bytes[] (if needed at
>>>>> all, because we know they wouldn't be mutated).
>>>>>
>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles <ke...@apache.org>
>>>>> wrote:
>>>>> >
>>>>> > Specifically, a lot of shared code assumes that repeatedly setting a
>>>>> timer is nearly free / the same cost as determining whether or not to set
>>>>> the timer. ReduceFnRunner has been refactored in a way so it would be very
>>>>> easy to set the GC timer once per window that occurs in a bundle, but
>>>>> there's probably some underlying inefficiency around why this isn't cheap
>>>>> that would be a bigger win.
>>>>> >
>>>>> > Kenn
>>>>> >
>>>>> > On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax <re...@google.com>
>>>>> wrote:
>>>>> >>
>>>>> >> I think the short answer is that folks working on the BeamFlink
>>>>> runner have mostly been focused on getting everything working, and so have
>>>>> not dug into this performance too deeply. I suspect that there is
>>>>> low-hanging fruit to optimize as a result.
>>>>> >>
>>>>> >> You're right that ReduceFnRunner schedules a timer for each
>>>>> element. I think this code dates back to before Beam; on Dataflow timers
>>>>> are identified by tag, so this simply overwrites the existing timer which
>>>>> is very cheap in Dataflow. If it is not cheap on Flink, this might be
>>>>> something to optimize.
>>>>> >>
>>>>> >> Reuven
>>>>> >>
>>>>> >> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek <jo...@gmail.com>
>>>>> wrote:
>>>>> >>>
>>>>> >>> Hello,
>>>>> >>>
>>>>> >>> I am interested in any knowledge or thoughts on what should be /
>>>>> is an overhead of running Beam pipelines instead of pipelines written on
>>>>> "bare runner". Is this something which is being tested or investigated by
>>>>> community? Is there a consensus in what bounds should the overhead
>>>>> typically be? I realise this is very runner specific, but certain things
>>>>> are imposed also by SDK model itself.
>>>>> >>>
>>>>> >>> I tested simple streaming pipeline on Flink vs Beam-Flink and
>>>>> found very noticeable differences. I want to stress out, it was not a
>>>>> performance test. Job does following:
>>>>> >>>
>>>>> >>> Read Kafka -> Deserialize to Proto -> Filter deserialisation
>>>>> errors -> Reshuffle -> Report counter.inc() to metrics for throughput
>>>>> >>>
>>>>> >>> Both jobs had same configuration and same state backed with same
>>>>> checkpointing strategy. What I noticed from few simple test runs:
>>>>> >>>
>>>>> >>> * first run on Flink 1.5.0 from CPU profiles on one worker I have
>>>>> found out that ~50% time was spend either on removing timers from
>>>>> HeapInternalTimerService or in java.io.ByteArrayOutputStream from
>>>>> CoderUtils.clone()
>>>>> >>>
>>>>> >>> * problem with timer delete was addressed by FLINK-9423. I have
>>>>> retested on Flink 1.7.2 and there was not much time is spend in timer
>>>>> delete now, but root cause was not removed. It still remains that timers
>>>>> are frequently registered and removed ( I believe from
>>>>> ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is called
>>>>> per processed element? )  which is noticeable in GC activity, Heap and
>>>>> State ...
>>>>> >>>
>>>>> >>> * in Flink I use FileSystem state backed which keeps state in
>>>>> memory CopyOnWriteStateTable which after some time is full of PaneInfo
>>>>> objects. Maybe they come from PaneInfoTracker activity
>>>>> >>>
>>>>> >>> * Coder clone is painfull. Pure Flink job does copy between
>>>>> operators too, in my case it is via Kryo.copy() but this is not noticeable
>>>>> in CPU profile. Kryo.copy() does copy on object level not boject -> bytes
>>>>> -> object which is cheaper
>>>>> >>>
>>>>> >>> Overall, my observation is that pure Flink can be roughly 3x
>>>>> faster.
>>>>> >>>
>>>>> >>> I do not know what I am trying to achieve here :) Probably just
>>>>> start a discussion and collect thoughts and other experiences on the cost
>>>>> of running some data processing on Beam and particular runner.
>>>>> >>>
>>>>>
>>>>

Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

Posted by Jozef Vilcek <jo...@gmail.com>.
Well, I did not do a proper perf test. What I am saying is that my
observation is:

* Flink native job does use copy of inputs but looking at stack trace perf
snapshots, CPU is most time engaged by inflating bytes read from Kafka
* Running Beam pipeline on Flink, Coder copy trace pops up in top CPU usages

I am just speculating here. Flink's "coders" does have
serialise/deserialize option and copy option. Plus it has an
isImmutableType() hint, so it has more potential to be more effective.


On Fri, May 3, 2019 at 2:01 PM Maximilian Michels <mx...@apache.org> wrote:

> Misread your post. You're saying that Kryo is more efficient that a
> roundtrip obj->bytes->obj_copy. Still, most types use Flink's
> serializers which also do the above roundtrip. So I'm not sure this
> performance advantage holds true for other Flink jobs.
>
> On 02.05.19 20:01, Maximilian Michels wrote:
> >> I am not sure what are you referring to here. What do you mean Kryo is
> >> simply slower ... Beam Kryo or Flink Kryo or?
> >
> > Flink uses Kryo as a fallback serializer when its own type serialization
> > system can't analyze the type. I'm just guessing here that this could be
> > slower.
> >
> > On 02.05.19 16:51, Jozef Vilcek wrote:
> >>
> >>
> >> On Thu, May 2, 2019 at 3:41 PM Maximilian Michels <mxm@apache.org
> >> <ma...@apache.org>> wrote:
> >>
> >>     Thanks for the JIRA issues Jozef!
> >>
> >>      > So the feature in Flink is operator chaining and Flink per
> >>     default initiate copy of input elements. In case of Beam coders copy
> >>     seems to be more noticable than native Flink.
> >>
> >>     Copying between chained operators can be turned off in the
> >>     FlinkPipelineOptions (if you know what you're doing).
> >>
> >>
> >> Yes, I know that it can be instracted to reuse objects (if you are
> >> referring to this). I am just not sure I want to open this door in
> >> general :)
> >> But it is interesting to learn, that with portability, this will be
> >> turned On per default. Quite important finding imho.
> >>
> >>     Beam coders should
> >>     not be slower than Flink's. They are simple wrapped. It seems Kryo
> is
> >>     simply slower which we could fix by providing more type hints to
> >> Flink.
> >>
> >>
> >> I am not sure what are you referring to here. What do you mean Kryo is
> >> simply slower ... Beam Kryo or Flink Kryo or?
> >>
> >>     -Max
> >>
> >>     On 02.05.19 13:15, Robert Bradshaw wrote:
> >>      > Thanks for filing those.
> >>      >
> >>      > As for how not doing a copy is "safe," it's not really. Beam
> >> simply
> >>      > asserts that you MUST NOT mutate your inputs (and direct runners,
> >>      > which are used during testing, do perform extra copies and
> >> checks to
> >>      > catch violations of this requirement).
> >>      >
> >>      > On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek
> >>     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
> >>      >>
> >>      >> I have created
> >>      >> https://issues.apache.org/jira/browse/BEAM-7204
> >>      >> https://issues.apache.org/jira/browse/BEAM-7206
> >>      >>
> >>      >> to track these topics further
> >>      >>
> >>      >> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek
> >>     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
> >>      >>>
> >>      >>>
> >>      >>>
> >>      >>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles
> >>     <kenn@apache.org <ma...@apache.org>> wrote:
> >>      >>>>
> >>      >>>>
> >>      >>>>
> >>      >>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax <relax@google.com
> >>     <ma...@google.com>> wrote:
> >>      >>>>>
> >>      >>>>> In that case, Robert's point is quite valid. The old Flink
> >>     runner I believe had no knowledge of fusion, which was known to make
> >>     it extremely slow. A lot of work went into making the portable
> >>     runner fusion aware, so we don't need to round trip through coders
> >>     on every ParDo.
> >>      >>>>
> >>      >>>>
> >>      >>>> The old Flink runner got fusion for free, since Flink does it.
> >>     The new fusion in portability is because fusing the runner side of
> >>     portability steps does not achieve real fusion
> >>      >>>
> >>      >>>
> >>      >>> Aha, I see. So the feature in Flink is operator chaining and
> >>     Flink per default initiate copy of input elements. In case of Beam
> >>     coders copy seems to be more noticable than native Flink.
> >>      >>> So do I get it right that in portable runner scenario, you do
> >>     similar chaining via this "fusion of stages"? Curious here... how is
> >>     it different from chaining so runner can be sure that not doing copy
> >>     is "safe" with respect to user defined functions and their behaviour
> >>     over inputs?
> >>      >>>
> >>      >>>>>
> >>      >>>>>
> >>      >>>>> Reuven
> >>      >>>>>
> >>      >>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek
> >>     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
> >>      >>>>>>
> >>      >>>>>> It was not a portable Flink runner.
> >>      >>>>>>
> >>      >>>>>> Thanks all for the thoughts, I will create JIRAs, as
> >>     suggested, with my findings and send them out
> >>      >>>>>>
> >>      >>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax
> >>     <relax@google.com <ma...@google.com>> wrote:
> >>      >>>>>>>
> >>      >>>>>>> Jozef did you use the portable Flink runner or the old one?
> >>      >>>>>>>
> >>      >>>>>>> Reuven
> >>      >>>>>>>
> >>      >>>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw
> >>     <robertwb@google.com <ma...@google.com>> wrote:
> >>      >>>>>>>>
> >>      >>>>>>>> Thanks for starting this investigation. As mentioned, most
> >>     of the work
> >>      >>>>>>>> to date has been on feature parity, not performance
> >>     parity, but we're
> >>      >>>>>>>> at the point that the latter should be tackled as well.
> >>     Even if there
> >>      >>>>>>>> is a slight overhead (and there's talk about integrating
> >>     more deeply
> >>      >>>>>>>> with the Flume DAG that could elide even that) I'd expect
> >>     it should be
> >>      >>>>>>>> nowhere near the 3x that you're seeing. Aside from the
> >>     timer issue,
> >>      >>>>>>>> sounds like the cloning via coders is is a huge drag that
> >>     needs to be
> >>      >>>>>>>> addressed. I wonder if this is one of those cases where
> >>     using the
> >>      >>>>>>>> portability framework could be a performance win
> >>     (specifically, no
> >>      >>>>>>>> cloning would happen between operators of fused stages,
> >>     and the
> >>      >>>>>>>> cloning between operators could be on the raw bytes[] (if
> >>     needed at
> >>      >>>>>>>> all, because we know they wouldn't be mutated).
> >>      >>>>>>>>
> >>      >>>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles
> >>     <kenn@apache.org <ma...@apache.org>> wrote:
> >>      >>>>>>>>>
> >>      >>>>>>>>> Specifically, a lot of shared code assumes that
> >>     repeatedly setting a timer is nearly free / the same cost as
> >>     determining whether or not to set the timer. ReduceFnRunner has been
> >>     refactored in a way so it would be very easy to set the GC timer
> >>     once per window that occurs in a bundle, but there's probably some
> >>     underlying inefficiency around why this isn't cheap that would be a
> >>     bigger win.
> >>      >>>>>>>>>
> >>      >>>>>>>>> Kenn
> >>      >>>>>>>>>
> >>      >>>>>>>>> On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax
> >>     <relax@google.com <ma...@google.com>> wrote:
> >>      >>>>>>>>>>
> >>      >>>>>>>>>> I think the short answer is that folks working on the
> >>     BeamFlink runner have mostly been focused on getting everything
> >>     working, and so have not dug into this performance too deeply. I
> >>     suspect that there is low-hanging fruit to optimize as a result.
> >>      >>>>>>>>>>
> >>      >>>>>>>>>> You're right that ReduceFnRunner schedules a timer for
> >>     each element. I think this code dates back to before Beam; on
> >>     Dataflow timers are identified by tag, so this simply overwrites the
> >>     existing timer which is very cheap in Dataflow. If it is not cheap
> >>     on Flink, this might be something to optimize.
> >>      >>>>>>>>>>
> >>      >>>>>>>>>> Reuven
> >>      >>>>>>>>>>
> >>      >>>>>>>>>> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek
> >>     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> Hello,
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> I am interested in any knowledge or thoughts on what
> >>     should be / is an overhead of running Beam pipelines instead of
> >>     pipelines written on "bare runner". Is this something which is being
> >>     tested or investigated by community? Is there a consensus in what
> >>     bounds should the overhead typically be? I realise this is very
> >>     runner specific, but certain things are imposed also by SDK model
> >>     itself.
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> I tested simple streaming pipeline on Flink vs
> >>     Beam-Flink and found very noticeable differences. I want to stress
> >>     out, it was not a performance test. Job does following:
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> Read Kafka -> Deserialize to Proto -> Filter
> >>     deserialisation errors -> Reshuffle -> Report counter.inc() to
> >>     metrics for throughput
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> Both jobs had same configuration and same state backed
> >>     with same checkpointing strategy. What I noticed from few simple
> >>     test runs:
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> * first run on Flink 1.5.0 from CPU profiles on one
> >>     worker I have found out that ~50% time was spend either on removing
> >>     timers from HeapInternalTimerService or in
> >>     java.io.ByteArrayOutputStream from CoderUtils.clone()
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> * problem with timer delete was addressed by
> >>     FLINK-9423. I have retested on Flink 1.7.2 and there was not much
> >>     time is spend in timer delete now, but root cause was not removed.
> >>     It still remains that timers are frequently registered and removed (
> >>     I believe from ReduceFnRunner.scheduleGarbageCollectionTimer() in
> >>     which case it is called per processed element? )  which is
> >>     noticeable in GC activity, Heap and State ...
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> * in Flink I use FileSystem state backed which keeps
> >>     state in memory CopyOnWriteStateTable which after some time is full
> >>     of PaneInfo objects. Maybe they come from PaneInfoTracker activity
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> * Coder clone is painfull. Pure Flink job does copy
> >>     between operators too, in my case it is via Kryo.copy() but this is
> >>     not noticeable in CPU profile. Kryo.copy() does copy on object level
> >>     not boject -> bytes -> object which is cheaper
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> Overall, my observation is that pure Flink can be
> >>     roughly 3x faster.
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> I do not know what I am trying to achieve here :)
> >>     Probably just start a discussion and collect thoughts and other
> >>     experiences on the cost of running some data processing on Beam and
> >>     particular runner.
> >>      >>>>>>>>>>>
> >>
>

Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

Posted by Maximilian Michels <mx...@apache.org>.
Misread your post. You're saying that Kryo is more efficient that a 
roundtrip obj->bytes->obj_copy. Still, most types use Flink's 
serializers which also do the above roundtrip. So I'm not sure this 
performance advantage holds true for other Flink jobs.

On 02.05.19 20:01, Maximilian Michels wrote:
>> I am not sure what are you referring to here. What do you mean Kryo is 
>> simply slower ... Beam Kryo or Flink Kryo or?
> 
> Flink uses Kryo as a fallback serializer when its own type serialization 
> system can't analyze the type. I'm just guessing here that this could be 
> slower.
> 
> On 02.05.19 16:51, Jozef Vilcek wrote:
>>
>>
>> On Thu, May 2, 2019 at 3:41 PM Maximilian Michels <mxm@apache.org 
>> <ma...@apache.org>> wrote:
>>
>>     Thanks for the JIRA issues Jozef!
>>
>>      > So the feature in Flink is operator chaining and Flink per
>>     default initiate copy of input elements. In case of Beam coders copy
>>     seems to be more noticable than native Flink.
>>
>>     Copying between chained operators can be turned off in the
>>     FlinkPipelineOptions (if you know what you're doing).
>>
>>
>> Yes, I know that it can be instracted to reuse objects (if you are 
>> referring to this). I am just not sure I want to open this door in 
>> general :)
>> But it is interesting to learn, that with portability, this will be 
>> turned On per default. Quite important finding imho.
>>
>>     Beam coders should
>>     not be slower than Flink's. They are simple wrapped. It seems Kryo is
>>     simply slower which we could fix by providing more type hints to 
>> Flink.
>>
>>
>> I am not sure what are you referring to here. What do you mean Kryo is 
>> simply slower ... Beam Kryo or Flink Kryo or?
>>
>>     -Max
>>
>>     On 02.05.19 13:15, Robert Bradshaw wrote:
>>      > Thanks for filing those.
>>      >
>>      > As for how not doing a copy is "safe," it's not really. Beam 
>> simply
>>      > asserts that you MUST NOT mutate your inputs (and direct runners,
>>      > which are used during testing, do perform extra copies and 
>> checks to
>>      > catch violations of this requirement).
>>      >
>>      > On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek
>>     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
>>      >>
>>      >> I have created
>>      >> https://issues.apache.org/jira/browse/BEAM-7204
>>      >> https://issues.apache.org/jira/browse/BEAM-7206
>>      >>
>>      >> to track these topics further
>>      >>
>>      >> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek
>>     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
>>      >>>
>>      >>>
>>      >>>
>>      >>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles
>>     <kenn@apache.org <ma...@apache.org>> wrote:
>>      >>>>
>>      >>>>
>>      >>>>
>>      >>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax <relax@google.com
>>     <ma...@google.com>> wrote:
>>      >>>>>
>>      >>>>> In that case, Robert's point is quite valid. The old Flink
>>     runner I believe had no knowledge of fusion, which was known to make
>>     it extremely slow. A lot of work went into making the portable
>>     runner fusion aware, so we don't need to round trip through coders
>>     on every ParDo.
>>      >>>>
>>      >>>>
>>      >>>> The old Flink runner got fusion for free, since Flink does it.
>>     The new fusion in portability is because fusing the runner side of
>>     portability steps does not achieve real fusion
>>      >>>
>>      >>>
>>      >>> Aha, I see. So the feature in Flink is operator chaining and
>>     Flink per default initiate copy of input elements. In case of Beam
>>     coders copy seems to be more noticable than native Flink.
>>      >>> So do I get it right that in portable runner scenario, you do
>>     similar chaining via this "fusion of stages"? Curious here... how is
>>     it different from chaining so runner can be sure that not doing copy
>>     is "safe" with respect to user defined functions and their behaviour
>>     over inputs?
>>      >>>
>>      >>>>>
>>      >>>>>
>>      >>>>> Reuven
>>      >>>>>
>>      >>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek
>>     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
>>      >>>>>>
>>      >>>>>> It was not a portable Flink runner.
>>      >>>>>>
>>      >>>>>> Thanks all for the thoughts, I will create JIRAs, as
>>     suggested, with my findings and send them out
>>      >>>>>>
>>      >>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax
>>     <relax@google.com <ma...@google.com>> wrote:
>>      >>>>>>>
>>      >>>>>>> Jozef did you use the portable Flink runner or the old one?
>>      >>>>>>>
>>      >>>>>>> Reuven
>>      >>>>>>>
>>      >>>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw
>>     <robertwb@google.com <ma...@google.com>> wrote:
>>      >>>>>>>>
>>      >>>>>>>> Thanks for starting this investigation. As mentioned, most
>>     of the work
>>      >>>>>>>> to date has been on feature parity, not performance
>>     parity, but we're
>>      >>>>>>>> at the point that the latter should be tackled as well.
>>     Even if there
>>      >>>>>>>> is a slight overhead (and there's talk about integrating
>>     more deeply
>>      >>>>>>>> with the Flume DAG that could elide even that) I'd expect
>>     it should be
>>      >>>>>>>> nowhere near the 3x that you're seeing. Aside from the
>>     timer issue,
>>      >>>>>>>> sounds like the cloning via coders is is a huge drag that
>>     needs to be
>>      >>>>>>>> addressed. I wonder if this is one of those cases where
>>     using the
>>      >>>>>>>> portability framework could be a performance win
>>     (specifically, no
>>      >>>>>>>> cloning would happen between operators of fused stages,
>>     and the
>>      >>>>>>>> cloning between operators could be on the raw bytes[] (if
>>     needed at
>>      >>>>>>>> all, because we know they wouldn't be mutated).
>>      >>>>>>>>
>>      >>>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles
>>     <kenn@apache.org <ma...@apache.org>> wrote:
>>      >>>>>>>>>
>>      >>>>>>>>> Specifically, a lot of shared code assumes that
>>     repeatedly setting a timer is nearly free / the same cost as
>>     determining whether or not to set the timer. ReduceFnRunner has been
>>     refactored in a way so it would be very easy to set the GC timer
>>     once per window that occurs in a bundle, but there's probably some
>>     underlying inefficiency around why this isn't cheap that would be a
>>     bigger win.
>>      >>>>>>>>>
>>      >>>>>>>>> Kenn
>>      >>>>>>>>>
>>      >>>>>>>>> On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax
>>     <relax@google.com <ma...@google.com>> wrote:
>>      >>>>>>>>>>
>>      >>>>>>>>>> I think the short answer is that folks working on the
>>     BeamFlink runner have mostly been focused on getting everything
>>     working, and so have not dug into this performance too deeply. I
>>     suspect that there is low-hanging fruit to optimize as a result.
>>      >>>>>>>>>>
>>      >>>>>>>>>> You're right that ReduceFnRunner schedules a timer for
>>     each element. I think this code dates back to before Beam; on
>>     Dataflow timers are identified by tag, so this simply overwrites the
>>     existing timer which is very cheap in Dataflow. If it is not cheap
>>     on Flink, this might be something to optimize.
>>      >>>>>>>>>>
>>      >>>>>>>>>> Reuven
>>      >>>>>>>>>>
>>      >>>>>>>>>> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek
>>     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
>>      >>>>>>>>>>>
>>      >>>>>>>>>>> Hello,
>>      >>>>>>>>>>>
>>      >>>>>>>>>>> I am interested in any knowledge or thoughts on what
>>     should be / is an overhead of running Beam pipelines instead of
>>     pipelines written on "bare runner". Is this something which is being
>>     tested or investigated by community? Is there a consensus in what
>>     bounds should the overhead typically be? I realise this is very
>>     runner specific, but certain things are imposed also by SDK model
>>     itself.
>>      >>>>>>>>>>>
>>      >>>>>>>>>>> I tested simple streaming pipeline on Flink vs
>>     Beam-Flink and found very noticeable differences. I want to stress
>>     out, it was not a performance test. Job does following:
>>      >>>>>>>>>>>
>>      >>>>>>>>>>> Read Kafka -> Deserialize to Proto -> Filter
>>     deserialisation errors -> Reshuffle -> Report counter.inc() to
>>     metrics for throughput
>>      >>>>>>>>>>>
>>      >>>>>>>>>>> Both jobs had same configuration and same state backed
>>     with same checkpointing strategy. What I noticed from few simple
>>     test runs:
>>      >>>>>>>>>>>
>>      >>>>>>>>>>> * first run on Flink 1.5.0 from CPU profiles on one
>>     worker I have found out that ~50% time was spend either on removing
>>     timers from HeapInternalTimerService or in
>>     java.io.ByteArrayOutputStream from CoderUtils.clone()
>>      >>>>>>>>>>>
>>      >>>>>>>>>>> * problem with timer delete was addressed by
>>     FLINK-9423. I have retested on Flink 1.7.2 and there was not much
>>     time is spend in timer delete now, but root cause was not removed.
>>     It still remains that timers are frequently registered and removed (
>>     I believe from ReduceFnRunner.scheduleGarbageCollectionTimer() in
>>     which case it is called per processed element? )  which is
>>     noticeable in GC activity, Heap and State ...
>>      >>>>>>>>>>>
>>      >>>>>>>>>>> * in Flink I use FileSystem state backed which keeps
>>     state in memory CopyOnWriteStateTable which after some time is full
>>     of PaneInfo objects. Maybe they come from PaneInfoTracker activity
>>      >>>>>>>>>>>
>>      >>>>>>>>>>> * Coder clone is painfull. Pure Flink job does copy
>>     between operators too, in my case it is via Kryo.copy() but this is
>>     not noticeable in CPU profile. Kryo.copy() does copy on object level
>>     not boject -> bytes -> object which is cheaper
>>      >>>>>>>>>>>
>>      >>>>>>>>>>> Overall, my observation is that pure Flink can be
>>     roughly 3x faster.
>>      >>>>>>>>>>>
>>      >>>>>>>>>>> I do not know what I am trying to achieve here :)
>>     Probably just start a discussion and collect thoughts and other
>>     experiences on the cost of running some data processing on Beam and
>>     particular runner.
>>      >>>>>>>>>>>
>>

Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

Posted by Robert Bradshaw <ro...@google.com>.
On Fri, May 3, 2019 at 9:29 AM Viliam Durina <vi...@hazelcast.com> wrote:
>
> > you MUST NOT mutate your inputs
> I think it's enough to not mutate the inputs after you emit them. From this follows that when you receive an input, the upstream vertex will not try to mutate it in parallel. This is what Hazelcast Jet expects. We have no option to automatically clone objects after each step.

There's also the case of sibling fusion. E.g. if your graph looks like

   ---> B
 /
A
 \
   ---> C

which all gets fused together, then both B and C are applied to each
output of A, which means it is not safe for B and C to mutate their
inputs lest its sibling (whichever is applied second) see this
mutation.

> On Thu, 2 May 2019 at 20:01, Maximilian Michels <mx...@apache.org> wrote:
>>
>> > I am not sure what are you referring to here. What do you mean Kryo is simply slower ... Beam Kryo or Flink Kryo or?
>>
>> Flink uses Kryo as a fallback serializer when its own type serialization
>> system can't analyze the type. I'm just guessing here that this could be
>> slower.
>>
>> On 02.05.19 16:51, Jozef Vilcek wrote:
>> >
>> >
>> > On Thu, May 2, 2019 at 3:41 PM Maximilian Michels <mxm@apache.org
>> > <ma...@apache.org>> wrote:
>> >
>> >     Thanks for the JIRA issues Jozef!
>> >
>> >      > So the feature in Flink is operator chaining and Flink per
>> >     default initiate copy of input elements. In case of Beam coders copy
>> >     seems to be more noticable than native Flink.
>> >
>> >     Copying between chained operators can be turned off in the
>> >     FlinkPipelineOptions (if you know what you're doing).
>> >
>> >
>> > Yes, I know that it can be instracted to reuse objects (if you are
>> > referring to this). I am just not sure I want to open this door in
>> > general :)
>> > But it is interesting to learn, that with portability, this will be
>> > turned On per default. Quite important finding imho.
>> >
>> >     Beam coders should
>> >     not be slower than Flink's. They are simple wrapped. It seems Kryo is
>> >     simply slower which we could fix by providing more type hints to Flink.
>> >
>> >
>> > I am not sure what are you referring to here. What do you mean Kryo is
>> > simply slower ... Beam Kryo or Flink Kryo or?
>> >
>> >     -Max
>> >
>> >     On 02.05.19 13:15, Robert Bradshaw wrote:
>> >      > Thanks for filing those.
>> >      >
>> >      > As for how not doing a copy is "safe," it's not really. Beam simply
>> >      > asserts that you MUST NOT mutate your inputs (and direct runners,
>> >      > which are used during testing, do perform extra copies and checks to
>> >      > catch violations of this requirement).
>> >      >
>> >      > On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek
>> >     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
>> >      >>
>> >      >> I have created
>> >      >> https://issues.apache.org/jira/browse/BEAM-7204
>> >      >> https://issues.apache.org/jira/browse/BEAM-7206
>> >      >>
>> >      >> to track these topics further
>> >      >>
>> >      >> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek
>> >     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
>> >      >>>
>> >      >>>
>> >      >>>
>> >      >>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles
>> >     <kenn@apache.org <ma...@apache.org>> wrote:
>> >      >>>>
>> >      >>>>
>> >      >>>>
>> >      >>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax <relax@google.com
>> >     <ma...@google.com>> wrote:
>> >      >>>>>
>> >      >>>>> In that case, Robert's point is quite valid. The old Flink
>> >     runner I believe had no knowledge of fusion, which was known to make
>> >     it extremely slow. A lot of work went into making the portable
>> >     runner fusion aware, so we don't need to round trip through coders
>> >     on every ParDo.
>> >      >>>>
>> >      >>>>
>> >      >>>> The old Flink runner got fusion for free, since Flink does it.
>> >     The new fusion in portability is because fusing the runner side of
>> >     portability steps does not achieve real fusion
>> >      >>>
>> >      >>>
>> >      >>> Aha, I see. So the feature in Flink is operator chaining and
>> >     Flink per default initiate copy of input elements. In case of Beam
>> >     coders copy seems to be more noticable than native Flink.
>> >      >>> So do I get it right that in portable runner scenario, you do
>> >     similar chaining via this "fusion of stages"? Curious here... how is
>> >     it different from chaining so runner can be sure that not doing copy
>> >     is "safe" with respect to user defined functions and their behaviour
>> >     over inputs?
>> >      >>>
>> >      >>>>>
>> >      >>>>>
>> >      >>>>> Reuven
>> >      >>>>>
>> >      >>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek
>> >     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
>> >      >>>>>>
>> >      >>>>>> It was not a portable Flink runner.
>> >      >>>>>>
>> >      >>>>>> Thanks all for the thoughts, I will create JIRAs, as
>> >     suggested, with my findings and send them out
>> >      >>>>>>
>> >      >>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax
>> >     <relax@google.com <ma...@google.com>> wrote:
>> >      >>>>>>>
>> >      >>>>>>> Jozef did you use the portable Flink runner or the old one?
>> >      >>>>>>>
>> >      >>>>>>> Reuven
>> >      >>>>>>>
>> >      >>>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw
>> >     <robertwb@google.com <ma...@google.com>> wrote:
>> >      >>>>>>>>
>> >      >>>>>>>> Thanks for starting this investigation. As mentioned, most
>> >     of the work
>> >      >>>>>>>> to date has been on feature parity, not performance
>> >     parity, but we're
>> >      >>>>>>>> at the point that the latter should be tackled as well.
>> >     Even if there
>> >      >>>>>>>> is a slight overhead (and there's talk about integrating
>> >     more deeply
>> >      >>>>>>>> with the Flume DAG that could elide even that) I'd expect
>> >     it should be
>> >      >>>>>>>> nowhere near the 3x that you're seeing. Aside from the
>> >     timer issue,
>> >      >>>>>>>> sounds like the cloning via coders is is a huge drag that
>> >     needs to be
>> >      >>>>>>>> addressed. I wonder if this is one of those cases where
>> >     using the
>> >      >>>>>>>> portability framework could be a performance win
>> >     (specifically, no
>> >      >>>>>>>> cloning would happen between operators of fused stages,
>> >     and the
>> >      >>>>>>>> cloning between operators could be on the raw bytes[] (if
>> >     needed at
>> >      >>>>>>>> all, because we know they wouldn't be mutated).
>> >      >>>>>>>>
>> >      >>>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles
>> >     <kenn@apache.org <ma...@apache.org>> wrote:
>> >      >>>>>>>>>
>> >      >>>>>>>>> Specifically, a lot of shared code assumes that
>> >     repeatedly setting a timer is nearly free / the same cost as
>> >     determining whether or not to set the timer. ReduceFnRunner has been
>> >     refactored in a way so it would be very easy to set the GC timer
>> >     once per window that occurs in a bundle, but there's probably some
>> >     underlying inefficiency around why this isn't cheap that would be a
>> >     bigger win.
>> >      >>>>>>>>>
>> >      >>>>>>>>> Kenn
>> >      >>>>>>>>>
>> >      >>>>>>>>> On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax
>> >     <relax@google.com <ma...@google.com>> wrote:
>> >      >>>>>>>>>>
>> >      >>>>>>>>>> I think the short answer is that folks working on the
>> >     BeamFlink runner have mostly been focused on getting everything
>> >     working, and so have not dug into this performance too deeply. I
>> >     suspect that there is low-hanging fruit to optimize as a result.
>> >      >>>>>>>>>>
>> >      >>>>>>>>>> You're right that ReduceFnRunner schedules a timer for
>> >     each element. I think this code dates back to before Beam; on
>> >     Dataflow timers are identified by tag, so this simply overwrites the
>> >     existing timer which is very cheap in Dataflow. If it is not cheap
>> >     on Flink, this might be something to optimize.
>> >      >>>>>>>>>>
>> >      >>>>>>>>>> Reuven
>> >      >>>>>>>>>>
>> >      >>>>>>>>>> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek
>> >     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> Hello,
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> I am interested in any knowledge or thoughts on what
>> >     should be / is an overhead of running Beam pipelines instead of
>> >     pipelines written on "bare runner". Is this something which is being
>> >     tested or investigated by community? Is there a consensus in what
>> >     bounds should the overhead typically be? I realise this is very
>> >     runner specific, but certain things are imposed also by SDK model
>> >     itself.
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> I tested simple streaming pipeline on Flink vs
>> >     Beam-Flink and found very noticeable differences. I want to stress
>> >     out, it was not a performance test. Job does following:
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> Read Kafka -> Deserialize to Proto -> Filter
>> >     deserialisation errors -> Reshuffle -> Report counter.inc() to
>> >     metrics for throughput
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> Both jobs had same configuration and same state backed
>> >     with same checkpointing strategy. What I noticed from few simple
>> >     test runs:
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> * first run on Flink 1.5.0 from CPU profiles on one
>> >     worker I have found out that ~50% time was spend either on removing
>> >     timers from HeapInternalTimerService or in
>> >     java.io.ByteArrayOutputStream from CoderUtils.clone()
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> * problem with timer delete was addressed by
>> >     FLINK-9423. I have retested on Flink 1.7.2 and there was not much
>> >     time is spend in timer delete now, but root cause was not removed.
>> >     It still remains that timers are frequently registered and removed (
>> >     I believe from ReduceFnRunner.scheduleGarbageCollectionTimer() in
>> >     which case it is called per processed element? )  which is
>> >     noticeable in GC activity, Heap and State ...
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> * in Flink I use FileSystem state backed which keeps
>> >     state in memory CopyOnWriteStateTable which after some time is full
>> >     of PaneInfo objects. Maybe they come from PaneInfoTracker activity
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> * Coder clone is painfull. Pure Flink job does copy
>> >     between operators too, in my case it is via Kryo.copy() but this is
>> >     not noticeable in CPU profile. Kryo.copy() does copy on object level
>> >     not boject -> bytes -> object which is cheaper
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> Overall, my observation is that pure Flink can be
>> >     roughly 3x faster.
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> I do not know what I am trying to achieve here :)
>> >     Probably just start a discussion and collect thoughts and other
>> >     experiences on the cost of running some data processing on Beam and
>> >     particular runner.
>> >      >>>>>>>>>>>
>> >

Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

Posted by Viliam Durina <vi...@hazelcast.com>.
> you MUST NOT mutate your inputs
I think it's enough to not mutate the inputs after you emit them. From this
follows that when you receive an input, the upstream vertex will not try to
mutate it in parallel. This is what Hazelcast Jet expects. We have no
option to automatically clone objects after each step.

Viliam

On Thu, 2 May 2019 at 20:01, Maximilian Michels <mx...@apache.org> wrote:

> > I am not sure what are you referring to here. What do you mean Kryo is
> simply slower ... Beam Kryo or Flink Kryo or?
>
> Flink uses Kryo as a fallback serializer when its own type serialization
> system can't analyze the type. I'm just guessing here that this could be
> slower.
>
> On 02.05.19 16:51, Jozef Vilcek wrote:
> >
> >
> > On Thu, May 2, 2019 at 3:41 PM Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Thanks for the JIRA issues Jozef!
> >
> >      > So the feature in Flink is operator chaining and Flink per
> >     default initiate copy of input elements. In case of Beam coders copy
> >     seems to be more noticable than native Flink.
> >
> >     Copying between chained operators can be turned off in the
> >     FlinkPipelineOptions (if you know what you're doing).
> >
> >
> > Yes, I know that it can be instracted to reuse objects (if you are
> > referring to this). I am just not sure I want to open this door in
> > general :)
> > But it is interesting to learn, that with portability, this will be
> > turned On per default. Quite important finding imho.
> >
> >     Beam coders should
> >     not be slower than Flink's. They are simple wrapped. It seems Kryo is
> >     simply slower which we could fix by providing more type hints to
> Flink.
> >
> >
> > I am not sure what are you referring to here. What do you mean Kryo is
> > simply slower ... Beam Kryo or Flink Kryo or?
> >
> >     -Max
> >
> >     On 02.05.19 13:15, Robert Bradshaw wrote:
> >      > Thanks for filing those.
> >      >
> >      > As for how not doing a copy is "safe," it's not really. Beam
> simply
> >      > asserts that you MUST NOT mutate your inputs (and direct runners,
> >      > which are used during testing, do perform extra copies and checks
> to
> >      > catch violations of this requirement).
> >      >
> >      > On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek
> >     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
> >      >>
> >      >> I have created
> >      >> https://issues.apache.org/jira/browse/BEAM-7204
> >      >> https://issues.apache.org/jira/browse/BEAM-7206
> >      >>
> >      >> to track these topics further
> >      >>
> >      >> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek
> >     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
> >      >>>
> >      >>>
> >      >>>
> >      >>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles
> >     <kenn@apache.org <ma...@apache.org>> wrote:
> >      >>>>
> >      >>>>
> >      >>>>
> >      >>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax <relax@google.com
> >     <ma...@google.com>> wrote:
> >      >>>>>
> >      >>>>> In that case, Robert's point is quite valid. The old Flink
> >     runner I believe had no knowledge of fusion, which was known to make
> >     it extremely slow. A lot of work went into making the portable
> >     runner fusion aware, so we don't need to round trip through coders
> >     on every ParDo.
> >      >>>>
> >      >>>>
> >      >>>> The old Flink runner got fusion for free, since Flink does it.
> >     The new fusion in portability is because fusing the runner side of
> >     portability steps does not achieve real fusion
> >      >>>
> >      >>>
> >      >>> Aha, I see. So the feature in Flink is operator chaining and
> >     Flink per default initiate copy of input elements. In case of Beam
> >     coders copy seems to be more noticable than native Flink.
> >      >>> So do I get it right that in portable runner scenario, you do
> >     similar chaining via this "fusion of stages"? Curious here... how is
> >     it different from chaining so runner can be sure that not doing copy
> >     is "safe" with respect to user defined functions and their behaviour
> >     over inputs?
> >      >>>
> >      >>>>>
> >      >>>>>
> >      >>>>> Reuven
> >      >>>>>
> >      >>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek
> >     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
> >      >>>>>>
> >      >>>>>> It was not a portable Flink runner.
> >      >>>>>>
> >      >>>>>> Thanks all for the thoughts, I will create JIRAs, as
> >     suggested, with my findings and send them out
> >      >>>>>>
> >      >>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax
> >     <relax@google.com <ma...@google.com>> wrote:
> >      >>>>>>>
> >      >>>>>>> Jozef did you use the portable Flink runner or the old one?
> >      >>>>>>>
> >      >>>>>>> Reuven
> >      >>>>>>>
> >      >>>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw
> >     <robertwb@google.com <ma...@google.com>> wrote:
> >      >>>>>>>>
> >      >>>>>>>> Thanks for starting this investigation. As mentioned, most
> >     of the work
> >      >>>>>>>> to date has been on feature parity, not performance
> >     parity, but we're
> >      >>>>>>>> at the point that the latter should be tackled as well.
> >     Even if there
> >      >>>>>>>> is a slight overhead (and there's talk about integrating
> >     more deeply
> >      >>>>>>>> with the Flume DAG that could elide even that) I'd expect
> >     it should be
> >      >>>>>>>> nowhere near the 3x that you're seeing. Aside from the
> >     timer issue,
> >      >>>>>>>> sounds like the cloning via coders is is a huge drag that
> >     needs to be
> >      >>>>>>>> addressed. I wonder if this is one of those cases where
> >     using the
> >      >>>>>>>> portability framework could be a performance win
> >     (specifically, no
> >      >>>>>>>> cloning would happen between operators of fused stages,
> >     and the
> >      >>>>>>>> cloning between operators could be on the raw bytes[] (if
> >     needed at
> >      >>>>>>>> all, because we know they wouldn't be mutated).
> >      >>>>>>>>
> >      >>>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles
> >     <kenn@apache.org <ma...@apache.org>> wrote:
> >      >>>>>>>>>
> >      >>>>>>>>> Specifically, a lot of shared code assumes that
> >     repeatedly setting a timer is nearly free / the same cost as
> >     determining whether or not to set the timer. ReduceFnRunner has been
> >     refactored in a way so it would be very easy to set the GC timer
> >     once per window that occurs in a bundle, but there's probably some
> >     underlying inefficiency around why this isn't cheap that would be a
> >     bigger win.
> >      >>>>>>>>>
> >      >>>>>>>>> Kenn
> >      >>>>>>>>>
> >      >>>>>>>>> On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax
> >     <relax@google.com <ma...@google.com>> wrote:
> >      >>>>>>>>>>
> >      >>>>>>>>>> I think the short answer is that folks working on the
> >     BeamFlink runner have mostly been focused on getting everything
> >     working, and so have not dug into this performance too deeply. I
> >     suspect that there is low-hanging fruit to optimize as a result.
> >      >>>>>>>>>>
> >      >>>>>>>>>> You're right that ReduceFnRunner schedules a timer for
> >     each element. I think this code dates back to before Beam; on
> >     Dataflow timers are identified by tag, so this simply overwrites the
> >     existing timer which is very cheap in Dataflow. If it is not cheap
> >     on Flink, this might be something to optimize.
> >      >>>>>>>>>>
> >      >>>>>>>>>> Reuven
> >      >>>>>>>>>>
> >      >>>>>>>>>> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek
> >     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
> >      >>>>>>>>>>>
> >      >>>>>>>>>>> Hello,
> >      >>>>>>>>>>>
> >      >>>>>>>>>>> I am interested in any knowledge or thoughts on what
> >     should be / is an overhead of running Beam pipelines instead of
> >     pipelines written on "bare runner". Is this something which is being
> >     tested or investigated by community? Is there a consensus in what
> >     bounds should the overhead typically be? I realise this is very
> >     runner specific, but certain things are imposed also by SDK model
> >     itself.
> >      >>>>>>>>>>>
> >      >>>>>>>>>>> I tested simple streaming pipeline on Flink vs
> >     Beam-Flink and found very noticeable differences. I want to stress
> >     out, it was not a performance test. Job does following:
> >      >>>>>>>>>>>
> >      >>>>>>>>>>> Read Kafka -> Deserialize to Proto -> Filter
> >     deserialisation errors -> Reshuffle -> Report counter.inc() to
> >     metrics for throughput
> >      >>>>>>>>>>>
> >      >>>>>>>>>>> Both jobs had same configuration and same state backed
> >     with same checkpointing strategy. What I noticed from few simple
> >     test runs:
> >      >>>>>>>>>>>
> >      >>>>>>>>>>> * first run on Flink 1.5.0 from CPU profiles on one
> >     worker I have found out that ~50% time was spend either on removing
> >     timers from HeapInternalTimerService or in
> >     java.io.ByteArrayOutputStream from CoderUtils.clone()
> >      >>>>>>>>>>>
> >      >>>>>>>>>>> * problem with timer delete was addressed by
> >     FLINK-9423. I have retested on Flink 1.7.2 and there was not much
> >     time is spend in timer delete now, but root cause was not removed.
> >     It still remains that timers are frequently registered and removed (
> >     I believe from ReduceFnRunner.scheduleGarbageCollectionTimer() in
> >     which case it is called per processed element? )  which is
> >     noticeable in GC activity, Heap and State ...
> >      >>>>>>>>>>>
> >      >>>>>>>>>>> * in Flink I use FileSystem state backed which keeps
> >     state in memory CopyOnWriteStateTable which after some time is full
> >     of PaneInfo objects. Maybe they come from PaneInfoTracker activity
> >      >>>>>>>>>>>
> >      >>>>>>>>>>> * Coder clone is painfull. Pure Flink job does copy
> >     between operators too, in my case it is via Kryo.copy() but this is
> >     not noticeable in CPU profile. Kryo.copy() does copy on object level
> >     not boject -> bytes -> object which is cheaper
> >      >>>>>>>>>>>
> >      >>>>>>>>>>> Overall, my observation is that pure Flink can be
> >     roughly 3x faster.
> >      >>>>>>>>>>>
> >      >>>>>>>>>>> I do not know what I am trying to achieve here :)
> >     Probably just start a discussion and collect thoughts and other
> >     experiences on the cost of running some data processing on Beam and
> >     particular runner.
> >      >>>>>>>>>>>
> >
>

Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

Posted by Maximilian Michels <mx...@apache.org>.
> I am not sure what are you referring to here. What do you mean Kryo is simply slower ... Beam Kryo or Flink Kryo or?

Flink uses Kryo as a fallback serializer when its own type serialization 
system can't analyze the type. I'm just guessing here that this could be 
slower.

On 02.05.19 16:51, Jozef Vilcek wrote:
> 
> 
> On Thu, May 2, 2019 at 3:41 PM Maximilian Michels <mxm@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Thanks for the JIRA issues Jozef!
> 
>      > So the feature in Flink is operator chaining and Flink per
>     default initiate copy of input elements. In case of Beam coders copy
>     seems to be more noticable than native Flink.
> 
>     Copying between chained operators can be turned off in the
>     FlinkPipelineOptions (if you know what you're doing).
> 
> 
> Yes, I know that it can be instracted to reuse objects (if you are 
> referring to this). I am just not sure I want to open this door in 
> general :)
> But it is interesting to learn, that with portability, this will be 
> turned On per default. Quite important finding imho.
> 
>     Beam coders should
>     not be slower than Flink's. They are simple wrapped. It seems Kryo is
>     simply slower which we could fix by providing more type hints to Flink.
> 
> 
> I am not sure what are you referring to here. What do you mean Kryo is 
> simply slower ... Beam Kryo or Flink Kryo or?
> 
>     -Max
> 
>     On 02.05.19 13:15, Robert Bradshaw wrote:
>      > Thanks for filing those.
>      >
>      > As for how not doing a copy is "safe," it's not really. Beam simply
>      > asserts that you MUST NOT mutate your inputs (and direct runners,
>      > which are used during testing, do perform extra copies and checks to
>      > catch violations of this requirement).
>      >
>      > On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek
>     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
>      >>
>      >> I have created
>      >> https://issues.apache.org/jira/browse/BEAM-7204
>      >> https://issues.apache.org/jira/browse/BEAM-7206
>      >>
>      >> to track these topics further
>      >>
>      >> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek
>     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
>      >>>
>      >>>
>      >>>
>      >>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles
>     <kenn@apache.org <ma...@apache.org>> wrote:
>      >>>>
>      >>>>
>      >>>>
>      >>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax <relax@google.com
>     <ma...@google.com>> wrote:
>      >>>>>
>      >>>>> In that case, Robert's point is quite valid. The old Flink
>     runner I believe had no knowledge of fusion, which was known to make
>     it extremely slow. A lot of work went into making the portable
>     runner fusion aware, so we don't need to round trip through coders
>     on every ParDo.
>      >>>>
>      >>>>
>      >>>> The old Flink runner got fusion for free, since Flink does it.
>     The new fusion in portability is because fusing the runner side of
>     portability steps does not achieve real fusion
>      >>>
>      >>>
>      >>> Aha, I see. So the feature in Flink is operator chaining and
>     Flink per default initiate copy of input elements. In case of Beam
>     coders copy seems to be more noticable than native Flink.
>      >>> So do I get it right that in portable runner scenario, you do
>     similar chaining via this "fusion of stages"? Curious here... how is
>     it different from chaining so runner can be sure that not doing copy
>     is "safe" with respect to user defined functions and their behaviour
>     over inputs?
>      >>>
>      >>>>>
>      >>>>>
>      >>>>> Reuven
>      >>>>>
>      >>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek
>     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
>      >>>>>>
>      >>>>>> It was not a portable Flink runner.
>      >>>>>>
>      >>>>>> Thanks all for the thoughts, I will create JIRAs, as
>     suggested, with my findings and send them out
>      >>>>>>
>      >>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax
>     <relax@google.com <ma...@google.com>> wrote:
>      >>>>>>>
>      >>>>>>> Jozef did you use the portable Flink runner or the old one?
>      >>>>>>>
>      >>>>>>> Reuven
>      >>>>>>>
>      >>>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw
>     <robertwb@google.com <ma...@google.com>> wrote:
>      >>>>>>>>
>      >>>>>>>> Thanks for starting this investigation. As mentioned, most
>     of the work
>      >>>>>>>> to date has been on feature parity, not performance
>     parity, but we're
>      >>>>>>>> at the point that the latter should be tackled as well.
>     Even if there
>      >>>>>>>> is a slight overhead (and there's talk about integrating
>     more deeply
>      >>>>>>>> with the Flume DAG that could elide even that) I'd expect
>     it should be
>      >>>>>>>> nowhere near the 3x that you're seeing. Aside from the
>     timer issue,
>      >>>>>>>> sounds like the cloning via coders is is a huge drag that
>     needs to be
>      >>>>>>>> addressed. I wonder if this is one of those cases where
>     using the
>      >>>>>>>> portability framework could be a performance win
>     (specifically, no
>      >>>>>>>> cloning would happen between operators of fused stages,
>     and the
>      >>>>>>>> cloning between operators could be on the raw bytes[] (if
>     needed at
>      >>>>>>>> all, because we know they wouldn't be mutated).
>      >>>>>>>>
>      >>>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles
>     <kenn@apache.org <ma...@apache.org>> wrote:
>      >>>>>>>>>
>      >>>>>>>>> Specifically, a lot of shared code assumes that
>     repeatedly setting a timer is nearly free / the same cost as
>     determining whether or not to set the timer. ReduceFnRunner has been
>     refactored in a way so it would be very easy to set the GC timer
>     once per window that occurs in a bundle, but there's probably some
>     underlying inefficiency around why this isn't cheap that would be a
>     bigger win.
>      >>>>>>>>>
>      >>>>>>>>> Kenn
>      >>>>>>>>>
>      >>>>>>>>> On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax
>     <relax@google.com <ma...@google.com>> wrote:
>      >>>>>>>>>>
>      >>>>>>>>>> I think the short answer is that folks working on the
>     BeamFlink runner have mostly been focused on getting everything
>     working, and so have not dug into this performance too deeply. I
>     suspect that there is low-hanging fruit to optimize as a result.
>      >>>>>>>>>>
>      >>>>>>>>>> You're right that ReduceFnRunner schedules a timer for
>     each element. I think this code dates back to before Beam; on
>     Dataflow timers are identified by tag, so this simply overwrites the
>     existing timer which is very cheap in Dataflow. If it is not cheap
>     on Flink, this might be something to optimize.
>      >>>>>>>>>>
>      >>>>>>>>>> Reuven
>      >>>>>>>>>>
>      >>>>>>>>>> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek
>     <jozo.vilcek@gmail.com <ma...@gmail.com>> wrote:
>      >>>>>>>>>>>
>      >>>>>>>>>>> Hello,
>      >>>>>>>>>>>
>      >>>>>>>>>>> I am interested in any knowledge or thoughts on what
>     should be / is an overhead of running Beam pipelines instead of
>     pipelines written on "bare runner". Is this something which is being
>     tested or investigated by community? Is there a consensus in what
>     bounds should the overhead typically be? I realise this is very
>     runner specific, but certain things are imposed also by SDK model
>     itself.
>      >>>>>>>>>>>
>      >>>>>>>>>>> I tested simple streaming pipeline on Flink vs
>     Beam-Flink and found very noticeable differences. I want to stress
>     out, it was not a performance test. Job does following:
>      >>>>>>>>>>>
>      >>>>>>>>>>> Read Kafka -> Deserialize to Proto -> Filter
>     deserialisation errors -> Reshuffle -> Report counter.inc() to
>     metrics for throughput
>      >>>>>>>>>>>
>      >>>>>>>>>>> Both jobs had same configuration and same state backed
>     with same checkpointing strategy. What I noticed from few simple
>     test runs:
>      >>>>>>>>>>>
>      >>>>>>>>>>> * first run on Flink 1.5.0 from CPU profiles on one
>     worker I have found out that ~50% time was spend either on removing
>     timers from HeapInternalTimerService or in
>     java.io.ByteArrayOutputStream from CoderUtils.clone()
>      >>>>>>>>>>>
>      >>>>>>>>>>> * problem with timer delete was addressed by
>     FLINK-9423. I have retested on Flink 1.7.2 and there was not much
>     time is spend in timer delete now, but root cause was not removed.
>     It still remains that timers are frequently registered and removed (
>     I believe from ReduceFnRunner.scheduleGarbageCollectionTimer() in
>     which case it is called per processed element? )  which is
>     noticeable in GC activity, Heap and State ...
>      >>>>>>>>>>>
>      >>>>>>>>>>> * in Flink I use FileSystem state backed which keeps
>     state in memory CopyOnWriteStateTable which after some time is full
>     of PaneInfo objects. Maybe they come from PaneInfoTracker activity
>      >>>>>>>>>>>
>      >>>>>>>>>>> * Coder clone is painfull. Pure Flink job does copy
>     between operators too, in my case it is via Kryo.copy() but this is
>     not noticeable in CPU profile. Kryo.copy() does copy on object level
>     not boject -> bytes -> object which is cheaper
>      >>>>>>>>>>>
>      >>>>>>>>>>> Overall, my observation is that pure Flink can be
>     roughly 3x faster.
>      >>>>>>>>>>>
>      >>>>>>>>>>> I do not know what I am trying to achieve here :)
>     Probably just start a discussion and collect thoughts and other
>     experiences on the cost of running some data processing on Beam and
>     particular runner.
>      >>>>>>>>>>>
> 

Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

Posted by Jozef Vilcek <jo...@gmail.com>.
On Thu, May 2, 2019 at 3:41 PM Maximilian Michels <mx...@apache.org> wrote:

> Thanks for the JIRA issues Jozef!
>
> > So the feature in Flink is operator chaining and Flink per default
> initiate copy of input elements. In case of Beam coders copy seems to be
> more noticable than native Flink.
>
> Copying between chained operators can be turned off in the
> FlinkPipelineOptions (if you know what you're doing).


Yes, I know that it can be instracted to reuse objects (if you are
referring to this). I am just not sure I want to open this door in general
:)
But it is interesting to learn, that with portability, this will be turned
On per default. Quite important finding imho.


> Beam coders should
> not be slower than Flink's. They are simple wrapped. It seems Kryo is
> simply slower which we could fix by providing more type hints to Flink.
>

I am not sure what are you referring to here. What do you mean Kryo is
simply slower ... Beam Kryo or Flink Kryo or?


> -Max
>
> On 02.05.19 13:15, Robert Bradshaw wrote:
> > Thanks for filing those.
> >
> > As for how not doing a copy is "safe," it's not really. Beam simply
> > asserts that you MUST NOT mutate your inputs (and direct runners,
> > which are used during testing, do perform extra copies and checks to
> > catch violations of this requirement).
> >
> > On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek <jo...@gmail.com>
> wrote:
> >>
> >> I have created
> >> https://issues.apache.org/jira/browse/BEAM-7204
> >> https://issues.apache.org/jira/browse/BEAM-7206
> >>
> >> to track these topics further
> >>
> >> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek <jo...@gmail.com>
> wrote:
> >>>
> >>>
> >>>
> >>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles <ke...@apache.org>
> wrote:
> >>>>
> >>>>
> >>>>
> >>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax <re...@google.com> wrote:
> >>>>>
> >>>>> In that case, Robert's point is quite valid. The old Flink runner I
> believe had no knowledge of fusion, which was known to make it extremely
> slow. A lot of work went into making the portable runner fusion aware, so
> we don't need to round trip through coders on every ParDo.
> >>>>
> >>>>
> >>>> The old Flink runner got fusion for free, since Flink does it. The
> new fusion in portability is because fusing the runner side of portability
> steps does not achieve real fusion
> >>>
> >>>
> >>> Aha, I see. So the feature in Flink is operator chaining and Flink per
> default initiate copy of input elements. In case of Beam coders copy seems
> to be more noticable than native Flink.
> >>> So do I get it right that in portable runner scenario, you do similar
> chaining via this "fusion of stages"? Curious here... how is it different
> from chaining so runner can be sure that not doing copy is "safe" with
> respect to user defined functions and their behaviour over inputs?
> >>>
> >>>>>
> >>>>>
> >>>>> Reuven
> >>>>>
> >>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek <jo...@gmail.com>
> wrote:
> >>>>>>
> >>>>>> It was not a portable Flink runner.
> >>>>>>
> >>>>>> Thanks all for the thoughts, I will create JIRAs, as suggested,
> with my findings and send them out
> >>>>>>
> >>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax <re...@google.com>
> wrote:
> >>>>>>>
> >>>>>>> Jozef did you use the portable Flink runner or the old one?
> >>>>>>>
> >>>>>>> Reuven
> >>>>>>>
> >>>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw <
> robertwb@google.com> wrote:
> >>>>>>>>
> >>>>>>>> Thanks for starting this investigation. As mentioned, most of the
> work
> >>>>>>>> to date has been on feature parity, not performance parity, but
> we're
> >>>>>>>> at the point that the latter should be tackled as well. Even if
> there
> >>>>>>>> is a slight overhead (and there's talk about integrating more
> deeply
> >>>>>>>> with the Flume DAG that could elide even that) I'd expect it
> should be
> >>>>>>>> nowhere near the 3x that you're seeing. Aside from the timer
> issue,
> >>>>>>>> sounds like the cloning via coders is is a huge drag that needs
> to be
> >>>>>>>> addressed. I wonder if this is one of those cases where using the
> >>>>>>>> portability framework could be a performance win (specifically, no
> >>>>>>>> cloning would happen between operators of fused stages, and the
> >>>>>>>> cloning between operators could be on the raw bytes[] (if needed
> at
> >>>>>>>> all, because we know they wouldn't be mutated).
> >>>>>>>>
> >>>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles <ke...@apache.org>
> wrote:
> >>>>>>>>>
> >>>>>>>>> Specifically, a lot of shared code assumes that repeatedly
> setting a timer is nearly free / the same cost as determining whether or
> not to set the timer. ReduceFnRunner has been refactored in a way so it
> would be very easy to set the GC timer once per window that occurs in a
> bundle, but there's probably some underlying inefficiency around why this
> isn't cheap that would be a bigger win.
> >>>>>>>>>
> >>>>>>>>> Kenn
> >>>>>>>>>
> >>>>>>>>> On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax <re...@google.com>
> wrote:
> >>>>>>>>>>
> >>>>>>>>>> I think the short answer is that folks working on the BeamFlink
> runner have mostly been focused on getting everything working, and so have
> not dug into this performance too deeply. I suspect that there is
> low-hanging fruit to optimize as a result.
> >>>>>>>>>>
> >>>>>>>>>> You're right that ReduceFnRunner schedules a timer for each
> element. I think this code dates back to before Beam; on Dataflow timers
> are identified by tag, so this simply overwrites the existing timer which
> is very cheap in Dataflow. If it is not cheap on Flink, this might be
> something to optimize.
> >>>>>>>>>>
> >>>>>>>>>> Reuven
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek <
> jozo.vilcek@gmail.com> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hello,
> >>>>>>>>>>>
> >>>>>>>>>>> I am interested in any knowledge or thoughts on what should be
> / is an overhead of running Beam pipelines instead of pipelines written on
> "bare runner". Is this something which is being tested or investigated by
> community? Is there a consensus in what bounds should the overhead
> typically be? I realise this is very runner specific, but certain things
> are imposed also by SDK model itself.
> >>>>>>>>>>>
> >>>>>>>>>>> I tested simple streaming pipeline on Flink vs Beam-Flink and
> found very noticeable differences. I want to stress out, it was not a
> performance test. Job does following:
> >>>>>>>>>>>
> >>>>>>>>>>> Read Kafka -> Deserialize to Proto -> Filter deserialisation
> errors -> Reshuffle -> Report counter.inc() to metrics for throughput
> >>>>>>>>>>>
> >>>>>>>>>>> Both jobs had same configuration and same state backed with
> same checkpointing strategy. What I noticed from few simple test runs:
> >>>>>>>>>>>
> >>>>>>>>>>> * first run on Flink 1.5.0 from CPU profiles on one worker I
> have found out that ~50% time was spend either on removing timers from
> HeapInternalTimerService or in java.io.ByteArrayOutputStream from
> CoderUtils.clone()
> >>>>>>>>>>>
> >>>>>>>>>>> * problem with timer delete was addressed by FLINK-9423. I
> have retested on Flink 1.7.2 and there was not much time is spend in timer
> delete now, but root cause was not removed. It still remains that timers
> are frequently registered and removed ( I believe from
> ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is called
> per processed element? )  which is noticeable in GC activity, Heap and
> State ...
> >>>>>>>>>>>
> >>>>>>>>>>> * in Flink I use FileSystem state backed which keeps state in
> memory CopyOnWriteStateTable which after some time is full of PaneInfo
> objects. Maybe they come from PaneInfoTracker activity
> >>>>>>>>>>>
> >>>>>>>>>>> * Coder clone is painfull. Pure Flink job does copy between
> operators too, in my case it is via Kryo.copy() but this is not noticeable
> in CPU profile. Kryo.copy() does copy on object level not boject -> bytes
> -> object which is cheaper
> >>>>>>>>>>>
> >>>>>>>>>>> Overall, my observation is that pure Flink can be roughly 3x
> faster.
> >>>>>>>>>>>
> >>>>>>>>>>> I do not know what I am trying to achieve here :) Probably
> just start a discussion and collect thoughts and other experiences on the
> cost of running some data processing on Beam and particular runner.
> >>>>>>>>>>>
>

Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

Posted by Maximilian Michels <mx...@apache.org>.
Thanks for the JIRA issues Jozef!

> So the feature in Flink is operator chaining and Flink per default initiate copy of input elements. In case of Beam coders copy seems to be more noticable than native Flink.

Copying between chained operators can be turned off in the 
FlinkPipelineOptions (if you know what you're doing). Beam coders should 
not be slower than Flink's. They are simple wrapped. It seems Kryo is 
simply slower which we could fix by providing more type hints to Flink.

-Max

On 02.05.19 13:15, Robert Bradshaw wrote:
> Thanks for filing those.
> 
> As for how not doing a copy is "safe," it's not really. Beam simply
> asserts that you MUST NOT mutate your inputs (and direct runners,
> which are used during testing, do perform extra copies and checks to
> catch violations of this requirement).
> 
> On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek <jo...@gmail.com> wrote:
>>
>> I have created
>> https://issues.apache.org/jira/browse/BEAM-7204
>> https://issues.apache.org/jira/browse/BEAM-7206
>>
>> to track these topics further
>>
>> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek <jo...@gmail.com> wrote:
>>>
>>>
>>>
>>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>>
>>>>
>>>>
>>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>> In that case, Robert's point is quite valid. The old Flink runner I believe had no knowledge of fusion, which was known to make it extremely slow. A lot of work went into making the portable runner fusion aware, so we don't need to round trip through coders on every ParDo.
>>>>
>>>>
>>>> The old Flink runner got fusion for free, since Flink does it. The new fusion in portability is because fusing the runner side of portability steps does not achieve real fusion
>>>
>>>
>>> Aha, I see. So the feature in Flink is operator chaining and Flink per default initiate copy of input elements. In case of Beam coders copy seems to be more noticable than native Flink.
>>> So do I get it right that in portable runner scenario, you do similar chaining via this "fusion of stages"? Curious here... how is it different from chaining so runner can be sure that not doing copy is "safe" with respect to user defined functions and their behaviour over inputs?
>>>
>>>>>
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek <jo...@gmail.com> wrote:
>>>>>>
>>>>>> It was not a portable Flink runner.
>>>>>>
>>>>>> Thanks all for the thoughts, I will create JIRAs, as suggested, with my findings and send them out
>>>>>>
>>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>> Jozef did you use the portable Flink runner or the old one?
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw <ro...@google.com> wrote:
>>>>>>>>
>>>>>>>> Thanks for starting this investigation. As mentioned, most of the work
>>>>>>>> to date has been on feature parity, not performance parity, but we're
>>>>>>>> at the point that the latter should be tackled as well. Even if there
>>>>>>>> is a slight overhead (and there's talk about integrating more deeply
>>>>>>>> with the Flume DAG that could elide even that) I'd expect it should be
>>>>>>>> nowhere near the 3x that you're seeing. Aside from the timer issue,
>>>>>>>> sounds like the cloning via coders is is a huge drag that needs to be
>>>>>>>> addressed. I wonder if this is one of those cases where using the
>>>>>>>> portability framework could be a performance win (specifically, no
>>>>>>>> cloning would happen between operators of fused stages, and the
>>>>>>>> cloning between operators could be on the raw bytes[] (if needed at
>>>>>>>> all, because we know they wouldn't be mutated).
>>>>>>>>
>>>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles <ke...@apache.org> wrote:
>>>>>>>>>
>>>>>>>>> Specifically, a lot of shared code assumes that repeatedly setting a timer is nearly free / the same cost as determining whether or not to set the timer. ReduceFnRunner has been refactored in a way so it would be very easy to set the GC timer once per window that occurs in a bundle, but there's probably some underlying inefficiency around why this isn't cheap that would be a bigger win.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax <re...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>> I think the short answer is that folks working on the BeamFlink runner have mostly been focused on getting everything working, and so have not dug into this performance too deeply. I suspect that there is low-hanging fruit to optimize as a result.
>>>>>>>>>>
>>>>>>>>>> You're right that ReduceFnRunner schedules a timer for each element. I think this code dates back to before Beam; on Dataflow timers are identified by tag, so this simply overwrites the existing timer which is very cheap in Dataflow. If it is not cheap on Flink, this might be something to optimize.
>>>>>>>>>>
>>>>>>>>>> Reuven
>>>>>>>>>>
>>>>>>>>>> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek <jo...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hello,
>>>>>>>>>>>
>>>>>>>>>>> I am interested in any knowledge or thoughts on what should be / is an overhead of running Beam pipelines instead of pipelines written on "bare runner". Is this something which is being tested or investigated by community? Is there a consensus in what bounds should the overhead typically be? I realise this is very runner specific, but certain things are imposed also by SDK model itself.
>>>>>>>>>>>
>>>>>>>>>>> I tested simple streaming pipeline on Flink vs Beam-Flink and found very noticeable differences. I want to stress out, it was not a performance test. Job does following:
>>>>>>>>>>>
>>>>>>>>>>> Read Kafka -> Deserialize to Proto -> Filter deserialisation errors -> Reshuffle -> Report counter.inc() to metrics for throughput
>>>>>>>>>>>
>>>>>>>>>>> Both jobs had same configuration and same state backed with same checkpointing strategy. What I noticed from few simple test runs:
>>>>>>>>>>>
>>>>>>>>>>> * first run on Flink 1.5.0 from CPU profiles on one worker I have found out that ~50% time was spend either on removing timers from HeapInternalTimerService or in java.io.ByteArrayOutputStream from CoderUtils.clone()
>>>>>>>>>>>
>>>>>>>>>>> * problem with timer delete was addressed by FLINK-9423. I have retested on Flink 1.7.2 and there was not much time is spend in timer delete now, but root cause was not removed. It still remains that timers are frequently registered and removed ( I believe from ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is called per processed element? )  which is noticeable in GC activity, Heap and State ...
>>>>>>>>>>>
>>>>>>>>>>> * in Flink I use FileSystem state backed which keeps state in memory CopyOnWriteStateTable which after some time is full of PaneInfo objects. Maybe they come from PaneInfoTracker activity
>>>>>>>>>>>
>>>>>>>>>>> * Coder clone is painfull. Pure Flink job does copy between operators too, in my case it is via Kryo.copy() but this is not noticeable in CPU profile. Kryo.copy() does copy on object level not boject -> bytes -> object which is cheaper
>>>>>>>>>>>
>>>>>>>>>>> Overall, my observation is that pure Flink can be roughly 3x faster.
>>>>>>>>>>>
>>>>>>>>>>> I do not know what I am trying to achieve here :) Probably just start a discussion and collect thoughts and other experiences on the cost of running some data processing on Beam and particular runner.
>>>>>>>>>>>

Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

Posted by Robert Bradshaw <ro...@google.com>.
Thanks for filing those.

As for how not doing a copy is "safe," it's not really. Beam simply
asserts that you MUST NOT mutate your inputs (and direct runners,
which are used during testing, do perform extra copies and checks to
catch violations of this requirement).

On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek <jo...@gmail.com> wrote:
>
> I have created
> https://issues.apache.org/jira/browse/BEAM-7204
> https://issues.apache.org/jira/browse/BEAM-7206
>
> to track these topics further
>
> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek <jo...@gmail.com> wrote:
>>
>>
>>
>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>
>>>
>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax <re...@google.com> wrote:
>>>>
>>>> In that case, Robert's point is quite valid. The old Flink runner I believe had no knowledge of fusion, which was known to make it extremely slow. A lot of work went into making the portable runner fusion aware, so we don't need to round trip through coders on every ParDo.
>>>
>>>
>>> The old Flink runner got fusion for free, since Flink does it. The new fusion in portability is because fusing the runner side of portability steps does not achieve real fusion
>>
>>
>> Aha, I see. So the feature in Flink is operator chaining and Flink per default initiate copy of input elements. In case of Beam coders copy seems to be more noticable than native Flink.
>> So do I get it right that in portable runner scenario, you do similar chaining via this "fusion of stages"? Curious here... how is it different from chaining so runner can be sure that not doing copy is "safe" with respect to user defined functions and their behaviour over inputs?
>>
>>>>
>>>>
>>>> Reuven
>>>>
>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek <jo...@gmail.com> wrote:
>>>>>
>>>>> It was not a portable Flink runner.
>>>>>
>>>>> Thanks all for the thoughts, I will create JIRAs, as suggested, with my findings and send them out
>>>>>
>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>> Jozef did you use the portable Flink runner or the old one?
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw <ro...@google.com> wrote:
>>>>>>>
>>>>>>> Thanks for starting this investigation. As mentioned, most of the work
>>>>>>> to date has been on feature parity, not performance parity, but we're
>>>>>>> at the point that the latter should be tackled as well. Even if there
>>>>>>> is a slight overhead (and there's talk about integrating more deeply
>>>>>>> with the Flume DAG that could elide even that) I'd expect it should be
>>>>>>> nowhere near the 3x that you're seeing. Aside from the timer issue,
>>>>>>> sounds like the cloning via coders is is a huge drag that needs to be
>>>>>>> addressed. I wonder if this is one of those cases where using the
>>>>>>> portability framework could be a performance win (specifically, no
>>>>>>> cloning would happen between operators of fused stages, and the
>>>>>>> cloning between operators could be on the raw bytes[] (if needed at
>>>>>>> all, because we know they wouldn't be mutated).
>>>>>>>
>>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles <ke...@apache.org> wrote:
>>>>>>> >
>>>>>>> > Specifically, a lot of shared code assumes that repeatedly setting a timer is nearly free / the same cost as determining whether or not to set the timer. ReduceFnRunner has been refactored in a way so it would be very easy to set the GC timer once per window that occurs in a bundle, but there's probably some underlying inefficiency around why this isn't cheap that would be a bigger win.
>>>>>>> >
>>>>>>> > Kenn
>>>>>>> >
>>>>>>> > On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax <re...@google.com> wrote:
>>>>>>> >>
>>>>>>> >> I think the short answer is that folks working on the BeamFlink runner have mostly been focused on getting everything working, and so have not dug into this performance too deeply. I suspect that there is low-hanging fruit to optimize as a result.
>>>>>>> >>
>>>>>>> >> You're right that ReduceFnRunner schedules a timer for each element. I think this code dates back to before Beam; on Dataflow timers are identified by tag, so this simply overwrites the existing timer which is very cheap in Dataflow. If it is not cheap on Flink, this might be something to optimize.
>>>>>>> >>
>>>>>>> >> Reuven
>>>>>>> >>
>>>>>>> >> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek <jo...@gmail.com> wrote:
>>>>>>> >>>
>>>>>>> >>> Hello,
>>>>>>> >>>
>>>>>>> >>> I am interested in any knowledge or thoughts on what should be / is an overhead of running Beam pipelines instead of pipelines written on "bare runner". Is this something which is being tested or investigated by community? Is there a consensus in what bounds should the overhead typically be? I realise this is very runner specific, but certain things are imposed also by SDK model itself.
>>>>>>> >>>
>>>>>>> >>> I tested simple streaming pipeline on Flink vs Beam-Flink and found very noticeable differences. I want to stress out, it was not a performance test. Job does following:
>>>>>>> >>>
>>>>>>> >>> Read Kafka -> Deserialize to Proto -> Filter deserialisation errors -> Reshuffle -> Report counter.inc() to metrics for throughput
>>>>>>> >>>
>>>>>>> >>> Both jobs had same configuration and same state backed with same checkpointing strategy. What I noticed from few simple test runs:
>>>>>>> >>>
>>>>>>> >>> * first run on Flink 1.5.0 from CPU profiles on one worker I have found out that ~50% time was spend either on removing timers from HeapInternalTimerService or in java.io.ByteArrayOutputStream from CoderUtils.clone()
>>>>>>> >>>
>>>>>>> >>> * problem with timer delete was addressed by FLINK-9423. I have retested on Flink 1.7.2 and there was not much time is spend in timer delete now, but root cause was not removed. It still remains that timers are frequently registered and removed ( I believe from ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is called per processed element? )  which is noticeable in GC activity, Heap and State ...
>>>>>>> >>>
>>>>>>> >>> * in Flink I use FileSystem state backed which keeps state in memory CopyOnWriteStateTable which after some time is full of PaneInfo objects. Maybe they come from PaneInfoTracker activity
>>>>>>> >>>
>>>>>>> >>> * Coder clone is painfull. Pure Flink job does copy between operators too, in my case it is via Kryo.copy() but this is not noticeable in CPU profile. Kryo.copy() does copy on object level not boject -> bytes -> object which is cheaper
>>>>>>> >>>
>>>>>>> >>> Overall, my observation is that pure Flink can be roughly 3x faster.
>>>>>>> >>>
>>>>>>> >>> I do not know what I am trying to achieve here :) Probably just start a discussion and collect thoughts and other experiences on the cost of running some data processing on Beam and particular runner.
>>>>>>> >>>

Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

Posted by Jozef Vilcek <jo...@gmail.com>.
I have created
https://issues.apache.org/jira/browse/BEAM-7204
https://issues.apache.org/jira/browse/BEAM-7206

to track these topics further

On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek <jo...@gmail.com> wrote:

>
>
> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>>
>>
>> On Tue, Apr 30, 2019, 07:05 Reuven Lax <re...@google.com> wrote:
>>
>>> In that case, Robert's point is quite valid. The old Flink runner I
>>> believe had no knowledge of fusion, which was known to make it extremely
>>> slow. A lot of work went into making the portable runner fusion aware, so
>>> we don't need to round trip through coders on every ParDo.
>>>
>>
>> The old Flink runner got fusion for free, since Flink does it. The new
>> fusion in portability is because fusing the runner side of portability
>> steps does not achieve real fusion
>>
>
> Aha, I see. So the feature in Flink is operator chaining and Flink per
> default initiate copy of input elements. In case of Beam coders copy seems
> to be more noticable than native Flink.
> So do I get it right that in portable runner scenario, you do similar
> chaining via this "fusion of stages"? Curious here... how is it different
> from chaining so runner can be sure that not doing copy is "safe" with
> respect to user defined functions and their behaviour over inputs?
>
>
>>
>>> Reuven
>>>
>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek <jo...@gmail.com>
>>> wrote:
>>>
>>>> It was not a portable Flink runner.
>>>>
>>>> Thanks all for the thoughts, I will create JIRAs, as suggested, with my
>>>> findings and send them out
>>>>
>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Jozef did you use the portable Flink runner or the old one?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks for starting this investigation. As mentioned, most of the work
>>>>>> to date has been on feature parity, not performance parity, but we're
>>>>>> at the point that the latter should be tackled as well. Even if there
>>>>>> is a slight overhead (and there's talk about integrating more deeply
>>>>>> with the Flume DAG that could elide even that) I'd expect it should be
>>>>>> nowhere near the 3x that you're seeing. Aside from the timer issue,
>>>>>> sounds like the cloning via coders is is a huge drag that needs to be
>>>>>> addressed. I wonder if this is one of those cases where using the
>>>>>> portability framework could be a performance win (specifically, no
>>>>>> cloning would happen between operators of fused stages, and the
>>>>>> cloning between operators could be on the raw bytes[] (if needed at
>>>>>> all, because we know they wouldn't be mutated).
>>>>>>
>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles <ke...@apache.org>
>>>>>> wrote:
>>>>>> >
>>>>>> > Specifically, a lot of shared code assumes that repeatedly setting
>>>>>> a timer is nearly free / the same cost as determining whether or not to set
>>>>>> the timer. ReduceFnRunner has been refactored in a way so it would be very
>>>>>> easy to set the GC timer once per window that occurs in a bundle, but
>>>>>> there's probably some underlying inefficiency around why this isn't cheap
>>>>>> that would be a bigger win.
>>>>>> >
>>>>>> > Kenn
>>>>>> >
>>>>>> > On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax <re...@google.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >> I think the short answer is that folks working on the BeamFlink
>>>>>> runner have mostly been focused on getting everything working, and so have
>>>>>> not dug into this performance too deeply. I suspect that there is
>>>>>> low-hanging fruit to optimize as a result.
>>>>>> >>
>>>>>> >> You're right that ReduceFnRunner schedules a timer for each
>>>>>> element. I think this code dates back to before Beam; on Dataflow timers
>>>>>> are identified by tag, so this simply overwrites the existing timer which
>>>>>> is very cheap in Dataflow. If it is not cheap on Flink, this might be
>>>>>> something to optimize.
>>>>>> >>
>>>>>> >> Reuven
>>>>>> >>
>>>>>> >> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek <
>>>>>> jozo.vilcek@gmail.com> wrote:
>>>>>> >>>
>>>>>> >>> Hello,
>>>>>> >>>
>>>>>> >>> I am interested in any knowledge or thoughts on what should be /
>>>>>> is an overhead of running Beam pipelines instead of pipelines written on
>>>>>> "bare runner". Is this something which is being tested or investigated by
>>>>>> community? Is there a consensus in what bounds should the overhead
>>>>>> typically be? I realise this is very runner specific, but certain things
>>>>>> are imposed also by SDK model itself.
>>>>>> >>>
>>>>>> >>> I tested simple streaming pipeline on Flink vs Beam-Flink and
>>>>>> found very noticeable differences. I want to stress out, it was not a
>>>>>> performance test. Job does following:
>>>>>> >>>
>>>>>> >>> Read Kafka -> Deserialize to Proto -> Filter deserialisation
>>>>>> errors -> Reshuffle -> Report counter.inc() to metrics for throughput
>>>>>> >>>
>>>>>> >>> Both jobs had same configuration and same state backed with same
>>>>>> checkpointing strategy. What I noticed from few simple test runs:
>>>>>> >>>
>>>>>> >>> * first run on Flink 1.5.0 from CPU profiles on one worker I have
>>>>>> found out that ~50% time was spend either on removing timers from
>>>>>> HeapInternalTimerService or in java.io.ByteArrayOutputStream from
>>>>>> CoderUtils.clone()
>>>>>> >>>
>>>>>> >>> * problem with timer delete was addressed by FLINK-9423. I have
>>>>>> retested on Flink 1.7.2 and there was not much time is spend in timer
>>>>>> delete now, but root cause was not removed. It still remains that timers
>>>>>> are frequently registered and removed ( I believe from
>>>>>> ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is called
>>>>>> per processed element? )  which is noticeable in GC activity, Heap and
>>>>>> State ...
>>>>>> >>>
>>>>>> >>> * in Flink I use FileSystem state backed which keeps state in
>>>>>> memory CopyOnWriteStateTable which after some time is full of PaneInfo
>>>>>> objects. Maybe they come from PaneInfoTracker activity
>>>>>> >>>
>>>>>> >>> * Coder clone is painfull. Pure Flink job does copy between
>>>>>> operators too, in my case it is via Kryo.copy() but this is not noticeable
>>>>>> in CPU profile. Kryo.copy() does copy on object level not boject -> bytes
>>>>>> -> object which is cheaper
>>>>>> >>>
>>>>>> >>> Overall, my observation is that pure Flink can be roughly 3x
>>>>>> faster.
>>>>>> >>>
>>>>>> >>> I do not know what I am trying to achieve here :) Probably just
>>>>>> start a discussion and collect thoughts and other experiences on the cost
>>>>>> of running some data processing on Beam and particular runner.
>>>>>> >>>
>>>>>>
>>>>>