You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ventura Del Monte <ve...@gmail.com> on 2015/05/22 10:58:58 UTC

repartion locally to task manager

Hello,

I am trying to introduce a new feature in my flink project, I would like to
shuffle (random repartition) my dataset only locally to a task manager, so
that each internal worker will have a different set of objects to work on.
I have looked to internal flink mechanism, and I know (i hope) how it
handles partitions. I think there are two ways to do it:

a) using a mapPartiton, which for each input object X should output a tuple
(X, destinationChannel), where the destinationChannel is the id of the new
worker that will receive X. The main problem of this solution is to
determine the correct destinationChannel in the mapPartition task. I think
every operation in flink is unaware of the task manager on which it is
executed, so I will need to read taskmanager config in order to get the
number of slots available on the current TM, but then how should I relate
this number to the total channels count, since I could have a situation
like this:

+----+----+----+----+----+----+----+----+----+---+---+---+---+----+
|    |    |    |    |    |    |    |    |    |   |   |   |   |    |
| 0  | 1  | 2  | 3  | 4  | 5  |  6 |  7 |  8 | 9 | 10| 11| 12| 13 |
+----+----+----+---------+----+----+----+----+--------------------+
|                   |                            |                |
|      TM1          |            TM2             |       TM3      |
+-------------------+----------------------------+----------------+

So even if I knew TM2 had 6 slots, i would not be able to know their id
range -> [4,9]

b) Destination channels are choosen in RegularPactTask.getOutputCollector,
so some modifications of this method would make the local repartition
possible using either a range or a custom partition, in order to make them
taskmanager-aware. Yet this will involve some edits to flink runtime.

Tbh, I would like to avoid the b. but I think I am at a dead end, and I
will have to edit it.

Do you have better suggestions? Thank you in advance.

Re: repartion locally to task manager

Posted by Stephan Ewen <se...@apache.org>.
Hi Ventura!

Concerning (1) :

What would be good is to make the
"org.apache.flink.runtime.instance.InstanceConnectionInfo" in the
getruntimeContext()'s RuntimeContext object. In order to do that, we could
need to move that into the flink-core package. We could also rename it
simply to "ConnectionInfo"


Concerning (2) :

I think this may be a bit harder to add. I am curious what your results are
without this optimization.


Stephan


On Mon, Jun 8, 2015 at 4:49 PM, Ventura Del Monte <venturadelmonte@gmail.com
> wrote:

> Hi Stephan,
>
> Many thank for your reply!
>
> 1) This would be a nice feature. I have already done something similar, if
> you told me which informations you would like to export in the runtime
> context, I could add them to my code, update unit tests and share them.
>
> 2) Yes, I have figured that out. However, I needed this kind of local
> repartition since I was working on a dataset sampler based on the filter
> operator (this is the first step of the iterative pipeline I am
> developing). To be honest, this repartition is just a plus because I have
> already achieved good results (even if a sampler like the one offered by
> spark when the ratio is low would be a good feature). The main drawback of
> this filter operation is that it takes in input always the same partition,
> so, if the partition is enough big, then the probability of sampling
> different items in consecutive filtering operations should be high (of
> course, using a good sampling factor and a correctly seeded rng). Yet if it
> was possible to shuffle the partitions on the same task manager, the
> following sampling operation would benefit, in my opinion, as the produced
> partition would contain different items with an even higher probability. Of
> course, I think this shuffle operation (being local to each tm) should not
> involve neither a network nor a disk transfer, otherwise, the game is not
> worth the candle.
> About the change of parallelism, I read that it triggers a sort of local
> re-distribution, but I do no think it is my case. Anyway, do you think this
> kind of shuffling/sampling can be achieved in flink? Does it make sense in
> your opinion?
>
>
> Best Regards,
> Ventura
>
> 2015-06-03 14:57 GMT+02:00 Stephan Ewen <se...@apache.org>:
>
>> Hi Ventura!
>>
>> Sorry for the late response. Here are a few ideas or comments that may
>> help you:
>>
>> 1) We want to make it possible for a function (such as MapFunction) to
>> figure out on which TaskManager it is running. The mechanism would be
>> something like "getRuntimeContext().getTaskManagerInformation()". That
>> should help you determine which TaskManager you are.
>>
>> 2) When you are scheduling tasks, it is not guaranteed that slots 0, 1,
>> 2, ... are on the same TaskManager. The assignment is a based on locality
>> of the input data stream and the availability of slots.
>>
>>
>> Can you explain a bit more what the feature you want to add actually
>> tries to achieve? Then I may be able to give you more pointers.
>>
>> When you say that you need local re-distribution, does it imply something
>> like below, where a change of parallelism between operators implies that
>> the only locally repartition (not across the boundaries of TaskManagers)?
>>
>>
>>  (map) (map)  (map) (map)
>>    \     /      \    /
>>     \   /        \  /
>>    (reduce)    (reduce)
>>       ^ ^        ^ ^
>>       | \        / |
>>       |  +------+  |
>>       | /        \ |
>>    (source)     (source)
>>
>>
>>
>> Greetings,
>> Stephan
>>
>>
>>
>> On Fri, May 22, 2015 at 10:58 AM, Ventura Del Monte <
>> venturadelmonte@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I am trying to introduce a new feature in my flink project, I would like
>>> to shuffle (random repartition) my dataset only locally to a task manager,
>>> so that each internal worker will have a different set of objects to work
>>> on. I have looked to internal flink mechanism, and I know (i hope) how it
>>> handles partitions. I think there are two ways to do it:
>>>
>>> a) using a mapPartiton, which for each input object X should output a
>>> tuple (X, destinationChannel), where the destinationChannel is the id of
>>> the new worker that will receive X. The main problem of this solution is to
>>> determine the correct destinationChannel in the mapPartition task. I think
>>> every operation in flink is unaware of the task manager on which it is
>>> executed, so I will need to read taskmanager config in order to get the
>>> number of slots available on the current TM, but then how should I relate
>>> this number to the total channels count, since I could have a situation
>>> like this:
>>>
>>> +----+----+----+----+----+----+----+----+----+---+---+---+---+----+
>>> |    |    |    |    |    |    |    |    |    |   |   |   |   |    |
>>> | 0  | 1  | 2  | 3  | 4  | 5  |  6 |  7 |  8 | 9 | 10| 11| 12| 13 |
>>> +----+----+----+---------+----+----+----+----+--------------------+
>>> |                   |                            |                |
>>> |      TM1          |            TM2             |       TM3      |
>>> +-------------------+----------------------------+----------------+
>>>
>>> So even if I knew TM2 had 6 slots, i would not be able to know their id
>>> range -> [4,9]
>>>
>>> b) Destination channels are choosen in
>>> RegularPactTask.getOutputCollector, so some modifications of this method
>>> would make the local repartition possible using either a range or a
>>> custom partition, in order to make them taskmanager-aware. Yet this will
>>> involve some edits to flink runtime.
>>>
>>> Tbh, I would like to avoid the b. but I think I am at a dead end, and I
>>> will have to edit it.
>>>
>>> Do you have better suggestions? Thank you in advance.
>>>
>>
>>
>

Re: repartion locally to task manager

Posted by Ventura Del Monte <ve...@gmail.com>.
Hi Stephan,

Many thank for your reply!

1) This would be a nice feature. I have already done something similar, if
you told me which informations you would like to export in the runtime
context, I could add them to my code, update unit tests and share them.

2) Yes, I have figured that out. However, I needed this kind of local
repartition since I was working on a dataset sampler based on the filter
operator (this is the first step of the iterative pipeline I am
developing). To be honest, this repartition is just a plus because I have
already achieved good results (even if a sampler like the one offered by
spark when the ratio is low would be a good feature). The main drawback of
this filter operation is that it takes in input always the same partition,
so, if the partition is enough big, then the probability of sampling
different items in consecutive filtering operations should be high (of
course, using a good sampling factor and a correctly seeded rng). Yet if it
was possible to shuffle the partitions on the same task manager, the
following sampling operation would benefit, in my opinion, as the produced
partition would contain different items with an even higher probability. Of
course, I think this shuffle operation (being local to each tm) should not
involve neither a network nor a disk transfer, otherwise, the game is not
worth the candle.
About the change of parallelism, I read that it triggers a sort of local
re-distribution, but I do no think it is my case. Anyway, do you think this
kind of shuffling/sampling can be achieved in flink? Does it make sense in
your opinion?


Best Regards,
Ventura

2015-06-03 14:57 GMT+02:00 Stephan Ewen <se...@apache.org>:

> Hi Ventura!
>
> Sorry for the late response. Here are a few ideas or comments that may
> help you:
>
> 1) We want to make it possible for a function (such as MapFunction) to
> figure out on which TaskManager it is running. The mechanism would be
> something like "getRuntimeContext().getTaskManagerInformation()". That
> should help you determine which TaskManager you are.
>
> 2) When you are scheduling tasks, it is not guaranteed that slots 0, 1, 2,
> ... are on the same TaskManager. The assignment is a based on locality of
> the input data stream and the availability of slots.
>
>
> Can you explain a bit more what the feature you want to add actually tries
> to achieve? Then I may be able to give you more pointers.
>
> When you say that you need local re-distribution, does it imply something
> like below, where a change of parallelism between operators implies that
> the only locally repartition (not across the boundaries of TaskManagers)?
>
>
>  (map) (map)  (map) (map)
>    \     /      \    /
>     \   /        \  /
>    (reduce)    (reduce)
>       ^ ^        ^ ^
>       | \        / |
>       |  +------+  |
>       | /        \ |
>    (source)     (source)
>
>
>
> Greetings,
> Stephan
>
>
>
> On Fri, May 22, 2015 at 10:58 AM, Ventura Del Monte <
> venturadelmonte@gmail.com> wrote:
>
>> Hello,
>>
>> I am trying to introduce a new feature in my flink project, I would like
>> to shuffle (random repartition) my dataset only locally to a task manager,
>> so that each internal worker will have a different set of objects to work
>> on. I have looked to internal flink mechanism, and I know (i hope) how it
>> handles partitions. I think there are two ways to do it:
>>
>> a) using a mapPartiton, which for each input object X should output a
>> tuple (X, destinationChannel), where the destinationChannel is the id of
>> the new worker that will receive X. The main problem of this solution is to
>> determine the correct destinationChannel in the mapPartition task. I think
>> every operation in flink is unaware of the task manager on which it is
>> executed, so I will need to read taskmanager config in order to get the
>> number of slots available on the current TM, but then how should I relate
>> this number to the total channels count, since I could have a situation
>> like this:
>>
>> +----+----+----+----+----+----+----+----+----+---+---+---+---+----+
>> |    |    |    |    |    |    |    |    |    |   |   |   |   |    |
>> | 0  | 1  | 2  | 3  | 4  | 5  |  6 |  7 |  8 | 9 | 10| 11| 12| 13 |
>> +----+----+----+---------+----+----+----+----+--------------------+
>> |                   |                            |                |
>> |      TM1          |            TM2             |       TM3      |
>> +-------------------+----------------------------+----------------+
>>
>> So even if I knew TM2 had 6 slots, i would not be able to know their id
>> range -> [4,9]
>>
>> b) Destination channels are choosen in
>> RegularPactTask.getOutputCollector, so some modifications of this method
>> would make the local repartition possible using either a range or a
>> custom partition, in order to make them taskmanager-aware. Yet this will
>> involve some edits to flink runtime.
>>
>> Tbh, I would like to avoid the b. but I think I am at a dead end, and I
>> will have to edit it.
>>
>> Do you have better suggestions? Thank you in advance.
>>
>
>

Re: repartion locally to task manager

Posted by Stephan Ewen <se...@apache.org>.
Hi Ventura!

Sorry for the late response. Here are a few ideas or comments that may help
you:

1) We want to make it possible for a function (such as MapFunction) to
figure out on which TaskManager it is running. The mechanism would be
something like "getRuntimeContext().getTaskManagerInformation()". That
should help you determine which TaskManager you are.

2) When you are scheduling tasks, it is not guaranteed that slots 0, 1, 2,
... are on the same TaskManager. The assignment is a based on locality of
the input data stream and the availability of slots.


Can you explain a bit more what the feature you want to add actually tries
to achieve? Then I may be able to give you more pointers.

When you say that you need local re-distribution, does it imply something
like below, where a change of parallelism between operators implies that
the only locally repartition (not across the boundaries of TaskManagers)?


 (map) (map)  (map) (map)
   \     /      \    /
    \   /        \  /
   (reduce)    (reduce)
      ^ ^        ^ ^
      | \        / |
      |  +------+  |
      | /        \ |
   (source)     (source)



Greetings,
Stephan



On Fri, May 22, 2015 at 10:58 AM, Ventura Del Monte <
venturadelmonte@gmail.com> wrote:

> Hello,
>
> I am trying to introduce a new feature in my flink project, I would like
> to shuffle (random repartition) my dataset only locally to a task manager,
> so that each internal worker will have a different set of objects to work
> on. I have looked to internal flink mechanism, and I know (i hope) how it
> handles partitions. I think there are two ways to do it:
>
> a) using a mapPartiton, which for each input object X should output a
> tuple (X, destinationChannel), where the destinationChannel is the id of
> the new worker that will receive X. The main problem of this solution is to
> determine the correct destinationChannel in the mapPartition task. I think
> every operation in flink is unaware of the task manager on which it is
> executed, so I will need to read taskmanager config in order to get the
> number of slots available on the current TM, but then how should I relate
> this number to the total channels count, since I could have a situation
> like this:
>
> +----+----+----+----+----+----+----+----+----+---+---+---+---+----+
> |    |    |    |    |    |    |    |    |    |   |   |   |   |    |
> | 0  | 1  | 2  | 3  | 4  | 5  |  6 |  7 |  8 | 9 | 10| 11| 12| 13 |
> +----+----+----+---------+----+----+----+----+--------------------+
> |                   |                            |                |
> |      TM1          |            TM2             |       TM3      |
> +-------------------+----------------------------+----------------+
>
> So even if I knew TM2 had 6 slots, i would not be able to know their id
> range -> [4,9]
>
> b) Destination channels are choosen in RegularPactTask.getOutputCollector,
> so some modifications of this method would make the local repartition
> possible using either a range or a custom partition, in order to make
> them taskmanager-aware. Yet this will involve some edits to flink runtime.
>
> Tbh, I would like to avoid the b. but I think I am at a dead end, and I
> will have to edit it.
>
> Do you have better suggestions? Thank you in advance.
>