You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Teodor Spæren <te...@riseup.net> on 2020/08/28 20:13:45 UTC

Design rational behind copying via serializing in flink runner

Hey!

First time posting to a mailing list, hope I did it correctly :)

I'm writing a master thesis at the University of Oslo and right now I'm 
looking at the performance overhead of using Beam with the Flink runnner 
versus plain Flink.

I've written a simple program, a custom source outputing 0, 1, 2, 3, up 
to N, going into a single identity operator and then int a filter which 
only matches N and prints that out. This is just to compare performance.

I've been doing some profiling of simple programs and one observation is 
the performance difference in the serialization. The hotspot is [1], 
which is used multiple places, but one place is [2], which is called 
from [3]. As far as I can tell, [1] seems to be implementing copying by 
first serializing and then deserializing and there are no way for the 
actual types to change this. In flink, you have control over the copy() 
method, like in [4] and so for certain types you can just do a simple 
return as you do here.

My queston is if I've understood the flow correctly so far and if so 
what the reason for doing it this way. Is it to avoid demanding that the 
type implement some type of cloning? And would it be possible to push 
this downward in the stack and allow the encoders to do define the copy 
schemantics? I'm willing to do the work here, just want to know if it 
would work on an arcitectural level.

If there is any known overheads of using beam that you would like to 
point out, I would love to hear about it.

Best regards,
Teodor Spæren

[1]: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java#L140
[2]: https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L85
[3]: https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java#L85
[4]: https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java#L53

Re: Design rational behind copying via serializing in flink runner

Posted by Luke Cwik <lc...@google.com>.
The idea in Beam has always been to make objects passed between transforms
to not be mutable and require users to either pass through their object or
output a new object.

Minimizing the places where Flink performs copying would be best but
without further investigation wouldn't be able to give concrete suggestions.

On Fri, Aug 28, 2020 at 1:41 PM Brian Hulette <bh...@google.com> wrote:

> I don't have any specific insight here, but this seems like a good
> discussion for +dev <de...@beam.apache.org>
>
> Brian
>
> On Fri, Aug 28, 2020 at 1:14 PM Teodor Spæren <te...@riseup.net>
> wrote:
>
>> Hey!
>>
>> First time posting to a mailing list, hope I did it correctly :)
>>
>> I'm writing a master thesis at the University of Oslo and right now I'm
>> looking at the performance overhead of using Beam with the Flink runnner
>> versus plain Flink.
>>
>> I've written a simple program, a custom source outputing 0, 1, 2, 3, up
>> to N, going into a single identity operator and then int a filter which
>> only matches N and prints that out. This is just to compare performance.
>>
>> I've been doing some profiling of simple programs and one observation is
>> the performance difference in the serialization. The hotspot is [1],
>> which is used multiple places, but one place is [2], which is called
>> from [3]. As far as I can tell, [1] seems to be implementing copying by
>> first serializing and then deserializing and there are no way for the
>> actual types to change this. In flink, you have control over the copy()
>> method, like in [4] and so for certain types you can just do a simple
>> return as you do here.
>>
>> My queston is if I've understood the flow correctly so far and if so
>> what the reason for doing it this way. Is it to avoid demanding that the
>> type implement some type of cloning? And would it be possible to push
>> this downward in the stack and allow the encoders to do define the copy
>> schemantics? I'm willing to do the work here, just want to know if it
>> would work on an arcitectural level.
>>
>> If there is any known overheads of using beam that you would like to
>> point out, I would love to hear about it.
>>
>> Best regards,
>> Teodor Spæren
>>
>> [1]:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java#L140
>> [2]:
>> https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L85
>> [3]:
>> https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java#L85
>> [4]:
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java#L53
>>
>

Re: Design rational behind copying via serializing in flink runner

Posted by Brian Hulette <bh...@google.com>.
I don't have any specific insight here, but this seems like a good
discussion for +dev <de...@beam.apache.org>

Brian

On Fri, Aug 28, 2020 at 1:14 PM Teodor Spæren <te...@riseup.net>
wrote:

> Hey!
>
> First time posting to a mailing list, hope I did it correctly :)
>
> I'm writing a master thesis at the University of Oslo and right now I'm
> looking at the performance overhead of using Beam with the Flink runnner
> versus plain Flink.
>
> I've written a simple program, a custom source outputing 0, 1, 2, 3, up
> to N, going into a single identity operator and then int a filter which
> only matches N and prints that out. This is just to compare performance.
>
> I've been doing some profiling of simple programs and one observation is
> the performance difference in the serialization. The hotspot is [1],
> which is used multiple places, but one place is [2], which is called
> from [3]. As far as I can tell, [1] seems to be implementing copying by
> first serializing and then deserializing and there are no way for the
> actual types to change this. In flink, you have control over the copy()
> method, like in [4] and so for certain types you can just do a simple
> return as you do here.
>
> My queston is if I've understood the flow correctly so far and if so
> what the reason for doing it this way. Is it to avoid demanding that the
> type implement some type of cloning? And would it be possible to push
> this downward in the stack and allow the encoders to do define the copy
> schemantics? I'm willing to do the work here, just want to know if it
> would work on an arcitectural level.
>
> If there is any known overheads of using beam that you would like to
> point out, I would love to hear about it.
>
> Best regards,
> Teodor Spæren
>
> [1]:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java#L140
> [2]:
> https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L85
> [3]:
> https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java#L85
> [4]:
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java#L53
>