You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Konstantinos Barmpis <ko...@york.ac.uk> on 2018/03/27 12:52:32 UTC

regarding the use of colocation groups

Hello,

I was wondering how to properly use colocation groups (if applicable) to
achieve the required functionality in the following two simple contrived
use-cases (focusing on the essence of the problem), both of which aim to be
executed on a multi-node cluster (2 or more slaves and a master), with 4
(or more) task slots each:

Use-case 1:

- I have a stream of words, a mapping function that performs some
computation for each word and several slaves in a Flink cluster.
- I would like words starting with the same letter to be routed to the same
slave.

------------------------------------------------------------

Use-case 2:

- I have a stream of words, a mapping function that performs some
computation for each word and several slaves in a Flink cluster.
- Not all slaves can process all words, and which slaves can process which
words only becomes known at runtime (e.g. through a configuration file in
the slaves' local filesystem).
- How can I achieve exactly-once processing of each word in this setting?

I understand that using a shared store (HDFS/flink shared variable) to
contain this config information may be one approach, we are investigating
if there is an alternative (possibly more elegant) solution using flink's
capabilities whilst retaining locality of config files.

------------------------------------------------------------

Thank you in advance for your time and help provided


-- 
Konstantinos Barmpis | Research Associate
White Rose Grid Enterprise Systems Group
Dept. of Computer Science
University of York
Tel: +44 (0) 1904-32 5653

Email Disclaimer:
http://www.york.ac.uk/docs/disclaimer/email.htm

Re: regarding the use of colocation groups

Posted by Konstantinos Barmpis <ko...@york.ac.uk>.
Hello Chesnay,

Thank you for your prompt reply and helpful feedback on the matter,

For the fist case-study a partitioner seems like a good idea, we have
managed to get it working on the default local minicluster but
unfortunately are running into issues deploying it on AWS. When we pinpoint
the issue we may get back to you for further clarifications.

For the second case-study it seems reasonable to contain these
configurations in a secured shared state whereby only the program can
access them and it can then distribute them accordingly to available
slaves. This would delegate the distribution logic to our algorithms
instead, but we do not currently envision this being a big limitation.

Cheers,



On 27 March 2018 at 14:40, Chesnay Schepler <ch...@apache.org> wrote:

> Hello,
>
> your first use-case should be achievable by using a custom partitioner
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#physical-partitioning>,
> probably with a KeySelector that returns the word.
>
> As for the second use-case, typically this would be achieved by storing
> the config in state, where each part of the config is distributed just like
> the words. Conceptually, this is the trivial solution that allows making
> use of all the reliability guarantees that Flink provides and will also
> continue to work when changing the parallelism or number of nodes.
>
> Would storing them in state be feasible for you, or are they to large for
> that?
>
>
> On 27.03.2018 14:52, Konstantinos Barmpis wrote:
>
> Hello,
>
> I was wondering how to properly use colocation groups (if applicable) to
> achieve the required functionality in the following two simple contrived
> use-cases (focusing on the essence of the problem), both of which aim to be
> executed on a multi-node cluster (2 or more slaves and a master), with 4
> (or more) task slots each:
>
> Use-case 1:
>
> - I have a stream of words, a mapping function that performs some
> computation for each word and several slaves in a Flink cluster.
> - I would like words starting with the same letter to be routed to the
> same slave.
>
> ------------------------------------------------------------
>
> Use-case 2:
>
> - I have a stream of words, a mapping function that performs some
> computation for each word and several slaves in a Flink cluster.
> - Not all slaves can process all words, and which slaves can process which
> words only becomes known at runtime (e.g. through a configuration file in
> the slaves' local filesystem).
> - How can I achieve exactly-once processing of each word in this setting?
>
> I understand that using a shared store (HDFS/flink shared variable) to
> contain this config information may be one approach, we are investigating
> if there is an alternative (possibly more elegant) solution using flink's
> capabilities whilst retaining locality of config files.
>
> ------------------------------------------------------------
>
> Thank you in advance for your time and help provided
>
>
> --
> Konstantinos Barmpis | Research Associate
> White Rose Grid Enterprise Systems Group
> Dept. of Computer Science
> University of York
> Tel: +44 (0) 1904-32 5653
>
> Email Disclaimer:
> http://www.york.ac.uk/docs/disclaimer/email.htm
>
>
>


-- 
Konstantinos Barmpis | Research Associate
White Rose Grid Enterprise Systems Group
Dept. of Computer Science
University of York
Tel: +44 (0) 1904-32 5653

Email Disclaimer:
http://www.york.ac.uk/docs/disclaimer/email.htm

Re: regarding the use of colocation groups

Posted by Chesnay Schepler <ch...@apache.org>.
Hello,

your first use-case should be achievable by using a custom partitioner 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#physical-partitioning>, 
probably with a KeySelector that returns the word.

As for the second use-case, typically this would be achieved by storing 
the config in state, where each part of the config is distributed just 
like the words. Conceptually, this is the trivial solution that allows 
making use of all the reliability guarantees that Flink provides and 
will also continue to work when changing the parallelism or number of nodes.

Would storing them in state be feasible for you, or are they to large 
for that?

On 27.03.2018 14:52, Konstantinos Barmpis wrote:
> Hello,
>
> I was wondering how to properly use colocation groups (if applicable) 
> to achieve the required functionality in the following two simple 
> contrived use-cases (focusing on the essence of the problem), both of 
> which aim to be executed on a multi-node cluster (2 or more slaves and 
> a master), with 4 (or more) task slots each:
>
> Use-case 1:
>
> - I have a stream of words, a mapping function that performs some 
> computation for each word and several slaves in a Flink cluster.
> - I would like words starting with the same letter to be routed to the 
> same slave.
>
> ------------------------------------------------------------
>
> Use-case 2:
>
> - I have a stream of words, a mapping function that performs some 
> computation for each word and several slaves in a Flink cluster.
> - Not all slaves can process all words, and which slaves can process 
> which words only becomes known at runtime (e.g. through a 
> configuration file in the slaves' local filesystem).
> - How can I achieve exactly-once processing of each word in this setting?
>
> I understand that using a shared store (HDFS/flink shared variable) to 
> contain this config information may be one approach, we are 
> investigating if there is an alternative (possibly more elegant) 
> solution using flink's capabilities whilst retaining locality of 
> config files.
>
> ------------------------------------------------------------
>
> Thank you in advance for your time and help provided
>
>
> -- 
> Konstantinos Barmpis | Research Associate
> White Rose Grid Enterprise Systems Group
> Dept. of Computer Science
> University of York
> Tel: +44 (0) 1904-32 5653
> Email Disclaimer:
> http://www.york.ac.uk/docs/disclaimer/email.htm