You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Niklas Semmler <ni...@inet.tu-berlin.de> on 2015/07/06 20:36:51 UTC

How do network transmissions in Flink work?

Hello Flink Community,

I am working on a network scheduler and am currently reading Flink's 
code to figure out how the data exchange works. It would be great if you 
could help me with some of my issues and questions.

Basically I want to extract from flink the time when a data transmission 
between two machines starts (1), their connection details (2), how much 
data is involved (3) and when it ends (4).

So far I have understood that the scheduling of tasks is done via the 
scheduleOrUpdateConsumers JobManagerMessage. In the function of the same 
name in the class Execution I have been able to extract the IP/Port pair 
of both the producer and the consumer(s) use.

Furthermore I understand that in the context of a "blocking" data 
transmission Flink will first create a ResultPartition and store all the 
data in the form of Buffers before starting the transmission. So in 
principle I should be able to figure out what amount of data Flink will 
communicate by looking at the respective 
ResultSubpartition.totalNumberOfBytes, right?
However, in the process I would need to map each ResultSubpartition to a 
slot or deployed task, so that I can associate this amount of data with 
connection details of the sender and the receiver. Any hints on how to 
do that?

Now from what I see the same is not possible in a "pipelined" context, 
correct? Can anything be said about the data to be communicated?

Finally, I was unable to locate in the code and in the logs where a 
Task's state is changing from RUNNING to FINISHED. Could you give me a 
pointer?

It would be great if you could share your insights on the problems above ;).

Best regards,
Niklas

-- 
PhD Student / Research Assistant
INET, TU Berlin
Room 4.029
Marchstr 23
10587 Berlin
Tel: +49 30 314 78752

Re: How do network transmissions in Flink work?

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Niklas,

I’ve missed your answer. Sorry for the delay!

> On 20 Jul 2015, at 19:00, Niklas Semmler <ns...@inet.tu-berlin.de> wrote:
> 
> Hello Ufuk,
> 
> thank you very much for the answer. You helped me to bring a great deal of context into the problem :).
> 
> I have one final question: What is a good indicator that the transfer of data contained in a single ResultPartition is finished? Is there any? Or can the amount of retrieved data be retrieved from the consumer?

The result life-cycle is independent of the task life-cycle.

On the output side you can look into the “notifySubpartitionConsumed()” notification (in ResultSubpartition). This signals that every buffer has been handed over to the network or consuming task (but buffers might still be in flight).

On the input side you can look into the SingleInputGate. The FINISHED event signals that the result partition has been fully received.

Does this help? What are you trying to do with the partitions?

Again, sorry for the delay.

– Ufuk


Re: How do network transmissions in Flink work?

Posted by Niklas Semmler <ns...@inet.tu-berlin.de>.
Hello Ufuk,

thank you very much for the answer. You helped me to bring a great deal 
of context into the problem :).

I have one final question: What is a good indicator that the transfer of 
data contained in a single ResultPartition is finished? Is there any? Or 
can the amount of retrieved data be retrieved from the consumer?

So far the only indicator I could come up with is the release of the 
ResultPartition or the state change to "FINISHED" of the task. However, 
as far as I understand the assigned resources will only be released 
after all ResultPartitions are transferred and the task is finished, so 
that seems to be a rather impractical indicator for the end of the data 
exchange.

Sorry for the late reply.

Cheers,
Niklas

On 13.07.2015 15:04, Ufuk Celebi wrote:
> Hey Niklas,
>
> there is also this Wiki entry: https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks
>
> On 09 Jul 2015, at 21:32, Niklas Semmler <ns...@inet.tu-berlin.de> wrote:
>
>> 1. What does the number of ResultSubpartition instances in the ResultPartition correspond to? Is one assigned to each consuming task? If so, how can I find for each ResultSubpartition the corresponding Task, Slot or similar? If not how is decided which piece of the data is routed to which consuming task?
>
> Yes, for each consuming task. The wiring depends on the DistributionPattern and the parallelism of the producing and consuming operator. You can look into the ExecutionGraph to see how the wiring works (see connect* methods in ExecutionVertex class). Each subpartition corresponds to an ExecutionEdge, which connects two ExecutionVertex instances, which is an abstraction for tasks at runtime. This is essentially also where the routing is set.
>
> Currently there is no way to get from the subpartition to the corresponding task. You would have to look into the places where the instances are created and pass the reference. The RuntimeEnvironmenet or Task class create these instances when a new task is submitted to a task manager.
>
>
>> 2. What defines the number of Buffer instances per ResultSubpartition? Does one Buffer correspond to exactly one serialized Record? Is a Record the single output of an operator, are there multiple records per operator, or
>> does it differ depending on the operator?
>
> The number of produced buffers depends on the data the corresponding operator/user function produces. Each produced record is serialized into a buffer. It can span multiple buffers depending on the record size.
>
> There can be zero or more records per produced partition. (There will always be at least a single buffer containing an end-of-partition event per partition though.)
>
>> 3. Or are the Buffers defined in a completely different manner? In that case, could you give me a pointer to understand how Buffer instances are used?
>
> The buffers is a wrapper for a MemorySegment with a reference to a buffer pool, which owns the buffer. Buffers are recycled after they have been consumed (e.g. after being written to the TCP channel or by the user code).
>
>
> Feel free to ask further questions or give feedback if you encounter anything you find weird. :-)
>
> – Ufuk
>

-- 
PhD Student / Research Assistant
INET, TU Berlin
Room 4.029
Marchstr 23
10587 Berlin
Tel: +49 30 314 78752

Re: How do network transmissions in Flink work?

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Niklas,

there is also this Wiki entry: https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks

On 09 Jul 2015, at 21:32, Niklas Semmler <ns...@inet.tu-berlin.de> wrote:

> 1. What does the number of ResultSubpartition instances in the ResultPartition correspond to? Is one assigned to each consuming task? If so, how can I find for each ResultSubpartition the corresponding Task, Slot or similar? If not how is decided which piece of the data is routed to which consuming task?

Yes, for each consuming task. The wiring depends on the DistributionPattern and the parallelism of the producing and consuming operator. You can look into the ExecutionGraph to see how the wiring works (see connect* methods in ExecutionVertex class). Each subpartition corresponds to an ExecutionEdge, which connects two ExecutionVertex instances, which is an abstraction for tasks at runtime. This is essentially also where the routing is set.

Currently there is no way to get from the subpartition to the corresponding task. You would have to look into the places where the instances are created and pass the reference. The RuntimeEnvironmenet or Task class create these instances when a new task is submitted to a task manager.


> 2. What defines the number of Buffer instances per ResultSubpartition? Does one Buffer correspond to exactly one serialized Record? Is a Record the single output of an operator, are there multiple records per operator, or 
> does it differ depending on the operator?

The number of produced buffers depends on the data the corresponding operator/user function produces. Each produced record is serialized into a buffer. It can span multiple buffers depending on the record size.

There can be zero or more records per produced partition. (There will always be at least a single buffer containing an end-of-partition event per partition though.)

> 3. Or are the Buffers defined in a completely different manner? In that case, could you give me a pointer to understand how Buffer instances are used?

The buffers is a wrapper for a MemorySegment with a reference to a buffer pool, which owns the buffer. Buffers are recycled after they have been consumed (e.g. after being written to the TCP channel or by the user code).


Feel free to ask further questions or give feedback if you encounter anything you find weird. :-)

– Ufuk

Re: How do network transmissions in Flink work?

Posted by Niklas Semmler <ns...@inet.tu-berlin.de>.
Hi Stephan,

thanks for the input :).

I have some further questions on how the data is segmented before/when 
it is moved over the network:

1. What does the number of ResultSubpartition instances in the 
ResultPartition correspond to? Is one assigned to each consuming task? 
If so, how can I find for each ResultSubpartition the corresponding 
Task, Slot or similar? If not how is decided which piece of the data is 
routed to which consuming task?

2. What defines the number of Buffer instances per ResultSubpartition? 
Does one Buffer correspond to exactly one serialized Record? Is a Record 
the single output of an operator, are there multiple records per 
operator, or does it differ depending on the operator?

3. Or are the Buffers defined in a completely different manner? In that 
case, could you give me a pointer to understand how Buffer instances are 
used?

It would be great if you could help me answer those. Even a partial 
answer would be great ;).

Best regards,
Niklas


On 08.07.2015 13:54, Stephan Ewen wrote:
> Hi!
>
> Here are a few pointers:
>
>    - The data transfer is the responsibility of the receiver. The sender
> cannot know ahead of time where data is sent
>
>    - On the receiver side, you should be able to count the received bytes in
> the RemoteInputChannel or LocalInputChannel.
>
>    - The JobManager is notified of the final state of a task when the task
> is completed (successful or unsuccessful) and un-registers. See
> "org.apache.flink.runtime.taskmanager.Task#notifyFinalState()".
>
> Let us know if you have more questions.
>
> Greetings,
> Stephan
>
>
> On Mon, Jul 6, 2015 at 8:36 PM, Niklas Semmler <ni...@inet.tu-berlin.de>
> wrote:
>
>> Hello Flink Community,
>>
>> I am working on a network scheduler and am currently reading Flink's code
>> to figure out how the data exchange works. It would be great if you could
>> help me with some of my issues and questions.
>>
>> Basically I want to extract from flink the time when a data transmission
>> between two machines starts (1), their connection details (2), how much
>> data is involved (3) and when it ends (4).
>>
>> So far I have understood that the scheduling of tasks is done via the
>> scheduleOrUpdateConsumers JobManagerMessage. In the function of the same
>> name in the class Execution I have been able to extract the IP/Port pair of
>> both the producer and the consumer(s) use.
>>
>> Furthermore I understand that in the context of a "blocking" data
>> transmission Flink will first create a ResultPartition and store all the
>> data in the form of Buffers before starting the transmission. So in
>> principle I should be able to figure out what amount of data Flink will
>> communicate by looking at the respective
>> ResultSubpartition.totalNumberOfBytes, right?
>> However, in the process I would need to map each ResultSubpartition to a
>> slot or deployed task, so that I can associate this amount of data with
>> connection details of the sender and the receiver. Any hints on how to do
>> that?
>>
>> Now from what I see the same is not possible in a "pipelined" context,
>> correct? Can anything be said about the data to be communicated?
>>
>> Finally, I was unable to locate in the code and in the logs where a Task's
>> state is changing from RUNNING to FINISHED. Could you give me a pointer?
>>
>> It would be great if you could share your insights on the problems above
>> ;).
>>
>> Best regards,
>> Niklas
>>
>> --
>> PhD Student / Research Assistant
>> INET, TU Berlin
>> Room 4.029
>> Marchstr 23
>> 10587 Berlin
>> Tel: +49 30 314 78752
>>
>

-- 
PhD Student / Research Assistant
INET, TU Berlin
Room 4.029
Marchstr 23
10587 Berlin
Tel: +49 30 314 78752

Re: How do network transmissions in Flink work?

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Here are a few pointers:

  - The data transfer is the responsibility of the receiver. The sender
cannot know ahead of time where data is sent

  - On the receiver side, you should be able to count the received bytes in
the RemoteInputChannel or LocalInputChannel.

  - The JobManager is notified of the final state of a task when the task
is completed (successful or unsuccessful) and un-registers. See
"org.apache.flink.runtime.taskmanager.Task#notifyFinalState()".

Let us know if you have more questions.

Greetings,
Stephan


On Mon, Jul 6, 2015 at 8:36 PM, Niklas Semmler <ni...@inet.tu-berlin.de>
wrote:

> Hello Flink Community,
>
> I am working on a network scheduler and am currently reading Flink's code
> to figure out how the data exchange works. It would be great if you could
> help me with some of my issues and questions.
>
> Basically I want to extract from flink the time when a data transmission
> between two machines starts (1), their connection details (2), how much
> data is involved (3) and when it ends (4).
>
> So far I have understood that the scheduling of tasks is done via the
> scheduleOrUpdateConsumers JobManagerMessage. In the function of the same
> name in the class Execution I have been able to extract the IP/Port pair of
> both the producer and the consumer(s) use.
>
> Furthermore I understand that in the context of a "blocking" data
> transmission Flink will first create a ResultPartition and store all the
> data in the form of Buffers before starting the transmission. So in
> principle I should be able to figure out what amount of data Flink will
> communicate by looking at the respective
> ResultSubpartition.totalNumberOfBytes, right?
> However, in the process I would need to map each ResultSubpartition to a
> slot or deployed task, so that I can associate this amount of data with
> connection details of the sender and the receiver. Any hints on how to do
> that?
>
> Now from what I see the same is not possible in a "pipelined" context,
> correct? Can anything be said about the data to be communicated?
>
> Finally, I was unable to locate in the code and in the logs where a Task's
> state is changing from RUNNING to FINISHED. Could you give me a pointer?
>
> It would be great if you could share your insights on the problems above
> ;).
>
> Best regards,
> Niklas
>
> --
> PhD Student / Research Assistant
> INET, TU Berlin
> Room 4.029
> Marchstr 23
> 10587 Berlin
> Tel: +49 30 314 78752
>