You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Manuela Chamda Tchakoute <ch...@gmail.com> on 2020/11/03 09:51:54 UTC

Please can some one remove me from this chat?

Hi,
Please can some one remove me from this chat?
On Oct 27, 2020 11:49 AM, "David Morávek" <dm...@apache.org> wrote:

> Hi Teodor,
>
> Thanks for bringing this up. This is a known, long standing "issue".
> Unfortunately there are few things we need to consider:
>
> - As you correctly noted, the *Beam model doesn't enforce immutability*
> of input / output elements, so this is the price.
> - We* can not break *existing pipelines.
> - Flink Runner needs to provide the *same guarantees as the Beam model*.
>
> There are definitely some things we can do here, to make things faster:
>
> - We can try the similar approach as HadoopIO (HadoopInputFormatReader#isKnownImmutable),
> to check for known immutable types (KV, primitives, protobuf, other known
> internal immutable structures).
> -* If the type is immutable, we can safely reuse it.* This should cover
> most of the performance costs without breaking the guarantees Beam model
> provides.
> - We can enable registration of custom "immutable" types via pipeline
> options? (this may be an unnecessary knob, so this needs a further
> discussion)
>
> WDYT?
>
> D.
>
>
> On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren <te...@riseup.net>
> wrote:
>
>> Hey!
>>
>> I'm a student at the University of Oslo, and I'm writing a master thesis
>> about the possibility of using Beam to benchmark stream processing
>> systems. An important factor in this is the overhead associated with
>> using Beam over writing code for the runner directly. [1] found that
>> there was a large overhead associated with using Beam, but did not
>> investigate where this overhead came from. I've done benchmarks and
>> confirmed the findings there, where for simple chains of identity
>> operators, Beam is 43x times slower than the Flink equivalent.
>>
>> These are very simple pipelines, with custom sources that just output a
>> series of integers. By profiling I've found that most of the overhead
>> comes from serializing and deserializing. Specifically the way
>> TypeSerializer's, [2], is implemented in [3], where each object is
>> serialized and then deserialized between every operator. Looking into
>> the semantics of Beam, no operator should change the input, so we don't
>> need to do a copy here. The function in [3] could potentially be changed
>> to a single `return` statement.
>>
>> Doing this removes 80% of the overhead in my tests. This is a very
>> synthetic example, but it's a low hanging fruit and might give a speed
>> boost to many pipelines when run on the Flink runnner. I would like to
>> make this my first contribution to Beam, but as the guide [4] says, I
>> thought I'd ask here first to see if there a is a reason not to do this.
>>
>> Only objection I can see, is that it might break existing pipelines
>> which rely on the Flink runner saving them from not following the
>> immutability guarantee. I see this as a small loss as they are relying
>> on an implementation detail of the Flink runner.
>>
>> I hope I have explained this adequately and eagerly away any feedback :)
>>
>> Best regards,
>> Teodor Spæren
>>
>> [1]: https://arxiv.org/abs/1907.08302
>> [2]: https://github.com/apache/flink/blob/master/flink-core/
>> src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
>> [3]: https://github.com/apache/beam/blob/master/runners/
>> flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/
>> CoderTypeSerializer.java#L84
>> [4]: https://beam.apache.org/contribute/
>>
>

Re: Please can some one remove me from this chat?

Posted by Ismaël Mejía <ie...@gmail.com>.
You need to send an email to: dev-unsubscribe@beam.apache.org
To stop receiving the messages

On Tue, Nov 3, 2020 at 10:52 AM Manuela Chamda Tchakoute
<ch...@gmail.com> wrote:
>
> Hi,
> Please can some one remove me from this chat?
>
> On Oct 27, 2020 11:49 AM, "David Morávek" <dm...@apache.org> wrote:
>>
>> Hi Teodor,
>>
>> Thanks for bringing this up. This is a known, long standing "issue". Unfortunately there are few things we need to consider:
>>
>> - As you correctly noted, the Beam model doesn't enforce immutability of input / output elements, so this is the price.
>> - We can not break existing pipelines.
>> - Flink Runner needs to provide the same guarantees as the Beam model.
>>
>> There are definitely some things we can do here, to make things faster:
>>
>> - We can try the similar approach as HadoopIO (HadoopInputFormatReader#isKnownImmutable), to check for known immutable types (KV, primitives, protobuf, other known internal immutable structures).
>> - If the type is immutable, we can safely reuse it. This should cover most of the performance costs without breaking the guarantees Beam model provides.
>> - We can enable registration of custom "immutable" types via pipeline options? (this may be an unnecessary knob, so this needs a further discussion)
>>
>> WDYT?
>>
>> D.
>>
>>
>> On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren <te...@riseup.net> wrote:
>>>
>>> Hey!
>>>
>>> I'm a student at the University of Oslo, and I'm writing a master thesis
>>> about the possibility of using Beam to benchmark stream processing
>>> systems. An important factor in this is the overhead associated with
>>> using Beam over writing code for the runner directly. [1] found that
>>> there was a large overhead associated with using Beam, but did not
>>> investigate where this overhead came from. I've done benchmarks and
>>> confirmed the findings there, where for simple chains of identity
>>> operators, Beam is 43x times slower than the Flink equivalent.
>>>
>>> These are very simple pipelines, with custom sources that just output a
>>> series of integers. By profiling I've found that most of the overhead
>>> comes from serializing and deserializing. Specifically the way
>>> TypeSerializer's, [2], is implemented in [3], where each object is
>>> serialized and then deserialized between every operator. Looking into
>>> the semantics of Beam, no operator should change the input, so we don't
>>> need to do a copy here. The function in [3] could potentially be changed
>>> to a single `return` statement.
>>>
>>> Doing this removes 80% of the overhead in my tests. This is a very
>>> synthetic example, but it's a low hanging fruit and might give a speed
>>> boost to many pipelines when run on the Flink runnner. I would like to
>>> make this my first contribution to Beam, but as the guide [4] says, I
>>> thought I'd ask here first to see if there a is a reason not to do this.
>>>
>>> Only objection I can see, is that it might break existing pipelines
>>> which rely on the Flink runner saving them from not following the
>>> immutability guarantee. I see this as a small loss as they are relying
>>> on an implementation detail of the Flink runner.
>>>
>>> I hope I have explained this adequately and eagerly away any feedback :)
>>>
>>> Best regards,
>>> Teodor Spæren
>>>
>>> [1]: https://arxiv.org/abs/1907.08302
>>> [2]: https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
>>> [3]: https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84
>>> [4]: https://beam.apache.org/contribute/