You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dmitry Golubets <dg...@gmail.com> on 2017/01/13 20:31:35 UTC

Can serialization be disabled between chains?

Hi,

Let's say we have multiple subtask chains and all of them are executing in
the same task manager slot (i.e. in the same JVM).
What's the point in serializing data between them?
Can it be disabled?

The reason I want keep different chains is that some subtasks should be
executed in parallel to each other.
Let's say I have tasks: A -> B
After task A pushed some data to task B I want task A to continue
processing without waiting for task B to finish.
What I'm talking about is a behavior of Akka Streams with disabled fusion.

Best regards,
Dmitry

Re: Can serialization be disabled between chains?

Posted by Ufuk Celebi <uc...@apache.org>.
+1 to what Fabian said. Regarding the memory consumption: Flink's back
pressure mechanisms also depends on this, because the availability of
(network) buffers determines how fast operator can produce data. If no
buffers are available, the producing operator will slow down.

On Mon, Jan 16, 2017 at 2:32 PM, Dmitry Golubets <dg...@gmail.com> wrote:
> First issue is not a problem with idiomatic Scala - we make all our data
> objects immutable.
> Second.. yeah, I guess it makes sense.
> Thanks for clarification.
>
> Best regards,
> Dmitry
>
> On Mon, Jan 16, 2017 at 1:27 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>> One of the reasons is to ensure that data cannot be modified after it left
>> a thread.
>> A function that emits the same object several times (in order to reduce
>> object creation & GC) might accidentally modify emitted records if they
>> would be put as object in a queue.
>> Moreover, it is easier to control the memory consumption if data is
>> serialized into a fixed number of buffers instead of being put on the JVM
>> heap.
>>
>> Best, Fabian
>>
>> 2017-01-16 14:21 GMT+01:00 Dmitry Golubets <dg...@gmail.com>:
>>>
>>> Hi Ufuk,
>>>
>>> Do you know what's the reason for serialization of data between different
>>> threads?
>>>
>>> Also, thanks for the link!
>>>
>>> Best regards,
>>> Dmitry
>>>
>>> On Mon, Jan 16, 2017 at 1:07 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>>>
>>>> Hey Dmitry,
>>>>
>>>> this is not possible if I'm understanding you correctly.
>>>>
>>>> A task chain is executed by a single task thread and hence it is not
>>>> possible to continue processing before the record "leaves" the thread,
>>>> which only happens when the next task thread or the network stack
>>>> consumes it.
>>>>
>>>> Hand over between chained tasks happens without serialization. Only
>>>> data between different task threads is serialized.
>>>>
>>>> Depending on your use case the newly introduced async I/O feature
>>>> might be worth a look (will be part of the upcoming 1.2 release):
>>>> https://github.com/apache/flink/pull/2629
>>>
>>>
>>
>

Re: Can serialization be disabled between chains?

Posted by Dmitry Golubets <dg...@gmail.com>.
First issue is not a problem with idiomatic Scala - we make all our data
objects immutable.
Second.. yeah, I guess it makes sense.
Thanks for clarification.

Best regards,
Dmitry

On Mon, Jan 16, 2017 at 1:27 PM, Fabian Hueske <fh...@gmail.com> wrote:

> One of the reasons is to ensure that data cannot be modified after it left
> a thread.
> A function that emits the same object several times (in order to reduce
> object creation & GC) might accidentally modify emitted records if they
> would be put as object in a queue.
> Moreover, it is easier to control the memory consumption if data is
> serialized into a fixed number of buffers instead of being put on the JVM
> heap.
>
> Best, Fabian
>
> 2017-01-16 14:21 GMT+01:00 Dmitry Golubets <dg...@gmail.com>:
>
>> Hi Ufuk,
>>
>> Do you know what's the reason for serialization of data between different
>> threads?
>>
>> Also, thanks for the link!
>>
>> Best regards,
>> Dmitry
>>
>> On Mon, Jan 16, 2017 at 1:07 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>
>>> Hey Dmitry,
>>>
>>> this is not possible if I'm understanding you correctly.
>>>
>>> A task chain is executed by a single task thread and hence it is not
>>> possible to continue processing before the record "leaves" the thread,
>>> which only happens when the next task thread or the network stack
>>> consumes it.
>>>
>>> Hand over between chained tasks happens without serialization. Only
>>> data between different task threads is serialized.
>>>
>>> Depending on your use case the newly introduced async I/O feature
>>> might be worth a look (will be part of the upcoming 1.2 release):
>>> https://github.com/apache/flink/pull/2629
>>>
>>
>>
>

Re: Can serialization be disabled between chains?

Posted by Fabian Hueske <fh...@gmail.com>.
One of the reasons is to ensure that data cannot be modified after it left
a thread.
A function that emits the same object several times (in order to reduce
object creation & GC) might accidentally modify emitted records if they
would be put as object in a queue.
Moreover, it is easier to control the memory consumption if data is
serialized into a fixed number of buffers instead of being put on the JVM
heap.

Best, Fabian

2017-01-16 14:21 GMT+01:00 Dmitry Golubets <dg...@gmail.com>:

> Hi Ufuk,
>
> Do you know what's the reason for serialization of data between different
> threads?
>
> Also, thanks for the link!
>
> Best regards,
> Dmitry
>
> On Mon, Jan 16, 2017 at 1:07 PM, Ufuk Celebi <uc...@apache.org> wrote:
>
>> Hey Dmitry,
>>
>> this is not possible if I'm understanding you correctly.
>>
>> A task chain is executed by a single task thread and hence it is not
>> possible to continue processing before the record "leaves" the thread,
>> which only happens when the next task thread or the network stack
>> consumes it.
>>
>> Hand over between chained tasks happens without serialization. Only
>> data between different task threads is serialized.
>>
>> Depending on your use case the newly introduced async I/O feature
>> might be worth a look (will be part of the upcoming 1.2 release):
>> https://github.com/apache/flink/pull/2629
>>
>
>

Re: Can serialization be disabled between chains?

Posted by Dmitry Golubets <dg...@gmail.com>.
Hi Ufuk,

Do you know what's the reason for serialization of data between different
threads?

Also, thanks for the link!

Best regards,
Dmitry

On Mon, Jan 16, 2017 at 1:07 PM, Ufuk Celebi <uc...@apache.org> wrote:

> Hey Dmitry,
>
> this is not possible if I'm understanding you correctly.
>
> A task chain is executed by a single task thread and hence it is not
> possible to continue processing before the record "leaves" the thread,
> which only happens when the next task thread or the network stack
> consumes it.
>
> Hand over between chained tasks happens without serialization. Only
> data between different task threads is serialized.
>
> Depending on your use case the newly introduced async I/O feature
> might be worth a look (will be part of the upcoming 1.2 release):
> https://github.com/apache/flink/pull/2629
>

Re: Can serialization be disabled between chains?

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Dmitry,

this is not possible if I'm understanding you correctly.

A task chain is executed by a single task thread and hence it is not
possible to continue processing before the record "leaves" the thread,
which only happens when the next task thread or the network stack
consumes it.

Hand over between chained tasks happens without serialization. Only
data between different task threads is serialized.

Depending on your use case the newly introduced async I/O feature
might be worth a look (will be part of the upcoming 1.2 release):
https://github.com/apache/flink/pull/2629