You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Brian Hulette <bh...@google.com> on 2020/09/01 00:36:56 UTC

Re: Design rational behind copying via serializing in flink runner

Hi Teodor,
I actually forward your message to dev@ before, but I foolishly
removed user@ from the distro so I think you weren't able to see it. Sorry
about that!

+Lukasz Cwik <lc...@google.com> replied there [1]. I'll copy it here and we
can keep discussing on this thread:

The idea in Beam has always been to make objects passed between transforms
to not be mutable and require users to either pass through their object or
output a new object.

Minimizing the places where Flink performs copying would be best but
without further investigation wouldn't be able to give concrete suggestions.


Brian

[1]
https://lists.apache.org/thread.html/r8c22c8b089f9caaac8efef90e62117a1db49af6471ff6bd7cbc5b882%40%3Cdev.beam.apache.org%3E

On Mon, Aug 31, 2020 at 11:14 AM Teodor Spæren <te...@riseup.net>
wrote:

> Hey!
>
> First time posting to a mailing list, hope I did it correctly :)
>
> I'm writing a master thesis at the University of Oslo and right now I'm
> looking at the performance overhead of using Beam with the Flink runnner
> versus plain Flink.
>
> I've written a simple program, a custom source outputing 0, 1, 2, 3, up
> to N, going into a single identity operator and then int a filter which
> only matches N and prints that out. This is just to compare performance.
>
> I've been doing some profiling of simple programs and one observation is
> the performance difference in the serialization. The hotspot is [1],
> which is used multiple places, but one place is [2], which is called
> from [3]. As far as I can tell, [1] seems to be implementing copying by
> first serializing and then deserializing and there are no way for the
> actual types to change this. In flink, you have control over the copy()
> method, like in [4] and so for certain types you can just do a simple
> return as you do here.
>
> My queston is if I've understood the flow correctly so far and if so
> what the reason for doing it this way. Is it to avoid demanding that the
> type implement some type of cloning? And would it be possible to push
> this downward in the stack and allow the encoders to do define the copy
> schemantics? I'm willing to do the work here, just want to know if it
> would work on an arcitectural level.
>
> If there is any known overheads of using beam that you would like to
> point out, I would love to hear about it.
>
> Best regards,
> Teodor Spæren
>
> [1]:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java#L140
> [2]:
> https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L85
> [3]:
> https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java#L85
> [4]:
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java#L53
>

Re: Design rational behind copying via serializing in flink runner

Posted by Maximilian Michels <mx...@apache.org>.
Hey Teodor,

Copying is the default behavior. This is tunable via the pipeline option 
'objectReuse', i.e. 'objectReuse=true'.

The option is disabled by default because users may not be aware of 
object reuse and recycle objects in their process functions which will 
have unexpected side effects.

Now, for primitive types, the copying is not even necessary and this 
could be optimized, similarly as done in the Flink serializers. Since we 
wrap Beam coders and expose them as Flink serializers for the Flink 
Runner, we would have to re-add this logic to Beam coders or the Flink 
Runner's CoderTypeSerializer.

-Max

On 06.09.20 11:01, Teodor Spæren wrote:
> Hey Brian!
> 
> Sorry for the late reply, this one kind of got lost in my mail client. 
> Still trying to figure this mailing list thing out, hehe.
> 
> I would like to try to see if a simple return there will speed things 
> up. I've never built BEAM by hand though, but is a full build as 
> described in [1], required to test this or can I do a more selective 
> build of only the java portion?
> 
> Another question is that Flink has object reuse, and it can be turned
> on through options to the flink runnner. If all passing between 
> operators is immutable anyway, why isn't this option enabled by default? 
> In this case it would not give a speed up I think, as the method is just 
> aliased. I haven't fully understood all the aspects of this option in 
> flink, so I might just be missing something.
> 
> Thanks for input so far :D
> 
> [1]: https://beam.apache.org/contribute/
> [2]: 
> https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L93 
> 
> 
> On Mon, Aug 31, 2020 at 05:36:56PM -0700, Brian Hulette wrote:
>> Hi Teodor,
>> I actually forward your message to dev@ before, but I foolishly
>> removed user@ from the distro so I think you weren't able to see it. 
>> Sorry
>> about that!
>>
>> +Lukasz Cwik <lc...@google.com> replied there [1]. I'll copy it here 
>> and we
>> can keep discussing on this thread:
>>
>> The idea in Beam has always been to make objects passed between 
>> transforms
>> to not be mutable and require users to either pass through their 
>> object or
>> output a new object.
>>
>> Minimizing the places where Flink performs copying would be best but
>> without further investigation wouldn't be able to give concrete 
>> suggestions.
>>
>>
>> Brian
>>
>> [1]
>> https://lists.apache.org/thread.html/r8c22c8b089f9caaac8efef90e62117a1db49af6471ff6bd7cbc5b882%40%3Cdev.beam.apache.org%3E 
>>
>>
>> On Mon, Aug 31, 2020 at 11:14 AM Teodor Spæren 
>> <te...@riseup.net>
>> wrote:
>>
>>> Hey!
>>>
>>> First time posting to a mailing list, hope I did it correctly :)
>>>
>>> I'm writing a master thesis at the University of Oslo and right now I'm
>>> looking at the performance overhead of using Beam with the Flink runnner
>>> versus plain Flink.
>>>
>>> I've written a simple program, a custom source outputing 0, 1, 2, 3, up
>>> to N, going into a single identity operator and then int a filter which
>>> only matches N and prints that out. This is just to compare performance.
>>>
>>> I've been doing some profiling of simple programs and one observation is
>>> the performance difference in the serialization. The hotspot is [1],
>>> which is used multiple places, but one place is [2], which is called
>>> from [3]. As far as I can tell, [1] seems to be implementing copying by
>>> first serializing and then deserializing and there are no way for the
>>> actual types to change this. In flink, you have control over the copy()
>>> method, like in [4] and so for certain types you can just do a simple
>>> return as you do here.
>>>
>>> My queston is if I've understood the flow correctly so far and if so
>>> what the reason for doing it this way. Is it to avoid demanding that the
>>> type implement some type of cloning? And would it be possible to push
>>> this downward in the stack and allow the encoders to do define the copy
>>> schemantics? I'm willing to do the work here, just want to know if it
>>> would work on an arcitectural level.
>>>
>>> If there is any known overheads of using beam that you would like to
>>> point out, I would love to hear about it.
>>>
>>> Best regards,
>>> Teodor Spæren
>>>
>>> [1]:
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java#L140 
>>>
>>> [2]:
>>> https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L85 
>>>
>>> [3]:
>>> https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java#L85 
>>>
>>> [4]:
>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java#L53 
>>>
>>>

Re: Design rational behind copying via serializing in flink runner

Posted by Maximilian Michels <mx...@apache.org>.
Hey Teodor,

Copying is the default behavior. This is tunable via the pipeline option 
'objectReuse', i.e. 'objectReuse=true'.

The option is disabled by default because users may not be aware of 
object reuse and recycle objects in their process functions which will 
have unexpected side effects.

Now, for primitive types, the copying is not even necessary and this 
could be optimized, similarly as done in the Flink serializers. Since we 
wrap Beam coders and expose them as Flink serializers for the Flink 
Runner, we would have to re-add this logic to Beam coders or the Flink 
Runner's CoderTypeSerializer.

-Max

On 06.09.20 11:01, Teodor Spæren wrote:
> Hey Brian!
> 
> Sorry for the late reply, this one kind of got lost in my mail client. 
> Still trying to figure this mailing list thing out, hehe.
> 
> I would like to try to see if a simple return there will speed things 
> up. I've never built BEAM by hand though, but is a full build as 
> described in [1], required to test this or can I do a more selective 
> build of only the java portion?
> 
> Another question is that Flink has object reuse, and it can be turned
> on through options to the flink runnner. If all passing between 
> operators is immutable anyway, why isn't this option enabled by default? 
> In this case it would not give a speed up I think, as the method is just 
> aliased. I haven't fully understood all the aspects of this option in 
> flink, so I might just be missing something.
> 
> Thanks for input so far :D
> 
> [1]: https://beam.apache.org/contribute/
> [2]: 
> https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L93 
> 
> 
> On Mon, Aug 31, 2020 at 05:36:56PM -0700, Brian Hulette wrote:
>> Hi Teodor,
>> I actually forward your message to dev@ before, but I foolishly
>> removed user@ from the distro so I think you weren't able to see it. 
>> Sorry
>> about that!
>>
>> +Lukasz Cwik <lc...@google.com> replied there [1]. I'll copy it here 
>> and we
>> can keep discussing on this thread:
>>
>> The idea in Beam has always been to make objects passed between 
>> transforms
>> to not be mutable and require users to either pass through their 
>> object or
>> output a new object.
>>
>> Minimizing the places where Flink performs copying would be best but
>> without further investigation wouldn't be able to give concrete 
>> suggestions.
>>
>>
>> Brian
>>
>> [1]
>> https://lists.apache.org/thread.html/r8c22c8b089f9caaac8efef90e62117a1db49af6471ff6bd7cbc5b882%40%3Cdev.beam.apache.org%3E 
>>
>>
>> On Mon, Aug 31, 2020 at 11:14 AM Teodor Spæren 
>> <te...@riseup.net>
>> wrote:
>>
>>> Hey!
>>>
>>> First time posting to a mailing list, hope I did it correctly :)
>>>
>>> I'm writing a master thesis at the University of Oslo and right now I'm
>>> looking at the performance overhead of using Beam with the Flink runnner
>>> versus plain Flink.
>>>
>>> I've written a simple program, a custom source outputing 0, 1, 2, 3, up
>>> to N, going into a single identity operator and then int a filter which
>>> only matches N and prints that out. This is just to compare performance.
>>>
>>> I've been doing some profiling of simple programs and one observation is
>>> the performance difference in the serialization. The hotspot is [1],
>>> which is used multiple places, but one place is [2], which is called
>>> from [3]. As far as I can tell, [1] seems to be implementing copying by
>>> first serializing and then deserializing and there are no way for the
>>> actual types to change this. In flink, you have control over the copy()
>>> method, like in [4] and so for certain types you can just do a simple
>>> return as you do here.
>>>
>>> My queston is if I've understood the flow correctly so far and if so
>>> what the reason for doing it this way. Is it to avoid demanding that the
>>> type implement some type of cloning? And would it be possible to push
>>> this downward in the stack and allow the encoders to do define the copy
>>> schemantics? I'm willing to do the work here, just want to know if it
>>> would work on an arcitectural level.
>>>
>>> If there is any known overheads of using beam that you would like to
>>> point out, I would love to hear about it.
>>>
>>> Best regards,
>>> Teodor Spæren
>>>
>>> [1]:
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java#L140 
>>>
>>> [2]:
>>> https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L85 
>>>
>>> [3]:
>>> https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java#L85 
>>>
>>> [4]:
>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java#L53 
>>>
>>>

Re: Design rational behind copying via serializing in flink runner

Posted by Teodor Spæren <te...@riseup.net>.
Hey Brian!

Sorry for the late reply, this one kind of got lost in my mail client. 
Still trying to figure this mailing list thing out, hehe.

I would like to try to see if a simple return there will speed things 
up. I've never built BEAM by hand though, but is a full build as 
described in [1], required to test this or can I do a more selective 
build of only the java portion?

Another question is that Flink has object reuse, and it can be turned
on through options to the flink runnner. If all passing between 
operators is immutable anyway, why isn't this option enabled by default? 
In this case it would not give a speed up I think, as the method is just 
aliased. I haven't fully understood all the aspects of this option in 
flink, so I might just be missing something.

Thanks for input so far :D

[1]: https://beam.apache.org/contribute/
[2]: https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L93

On Mon, Aug 31, 2020 at 05:36:56PM -0700, Brian Hulette wrote:
>Hi Teodor,
>I actually forward your message to dev@ before, but I foolishly
>removed user@ from the distro so I think you weren't able to see it. Sorry
>about that!
>
>+Lukasz Cwik <lc...@google.com> replied there [1]. I'll copy it here and we
>can keep discussing on this thread:
>
>The idea in Beam has always been to make objects passed between transforms
>to not be mutable and require users to either pass through their object or
>output a new object.
>
>Minimizing the places where Flink performs copying would be best but
>without further investigation wouldn't be able to give concrete suggestions.
>
>
>Brian
>
>[1]
>https://lists.apache.org/thread.html/r8c22c8b089f9caaac8efef90e62117a1db49af6471ff6bd7cbc5b882%40%3Cdev.beam.apache.org%3E
>
>On Mon, Aug 31, 2020 at 11:14 AM Teodor Spæren <te...@riseup.net>
>wrote:
>
>> Hey!
>>
>> First time posting to a mailing list, hope I did it correctly :)
>>
>> I'm writing a master thesis at the University of Oslo and right now I'm
>> looking at the performance overhead of using Beam with the Flink runnner
>> versus plain Flink.
>>
>> I've written a simple program, a custom source outputing 0, 1, 2, 3, up
>> to N, going into a single identity operator and then int a filter which
>> only matches N and prints that out. This is just to compare performance.
>>
>> I've been doing some profiling of simple programs and one observation is
>> the performance difference in the serialization. The hotspot is [1],
>> which is used multiple places, but one place is [2], which is called
>> from [3]. As far as I can tell, [1] seems to be implementing copying by
>> first serializing and then deserializing and there are no way for the
>> actual types to change this. In flink, you have control over the copy()
>> method, like in [4] and so for certain types you can just do a simple
>> return as you do here.
>>
>> My queston is if I've understood the flow correctly so far and if so
>> what the reason for doing it this way. Is it to avoid demanding that the
>> type implement some type of cloning? And would it be possible to push
>> this downward in the stack and allow the encoders to do define the copy
>> schemantics? I'm willing to do the work here, just want to know if it
>> would work on an arcitectural level.
>>
>> If there is any known overheads of using beam that you would like to
>> point out, I would love to hear about it.
>>
>> Best regards,
>> Teodor Spæren
>>
>> [1]:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java#L140
>> [2]:
>> https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L85
>> [3]:
>> https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java#L85
>> [4]:
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java#L53
>>