You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Ovidiu-Cristian MARCU <ov...@inria.fr> on 2017/02/20 10:46:04 UTC

KeyGroupRangeAssignment ?

Hi,

Can you please comment on how can I ensure stream input records are distributed evenly onto task slots?
See attached screen Records received issue.

I have a simple application which is applying some window function over a stream partitioned as follows:
(parallelism is equal to the number of keys; records with the same key are streamed evenly)

// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
DataStream<Tuple8<String, String, String, Integer, String, Double, Long, Long>> input = text.flatMap(...);
DataStream<Double> counts1 = null;
counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
		.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
		...
		});
counts1.writeAsText(params.get("output1"));
env.execute("Socket Window WordCount”);

Best,
Ovidiu

Re: KeyGroupRangeAssignment ?

Posted by Ovidiu-Cristian MARCU <ov...@inria.fr>.
Filled in https://issues.apache.org/jira/browse/FLINK-5873 <https://issues.apache.org/jira/browse/FLINK-5873>

Best,
Ovidiu

> On 21 Feb 2017, at 12:00, Ovidiu-Cristian MARCU <ov...@inria.fr> wrote:
> 
> Hi Till,
> 
> I will look into filling a jira issue.
> 
> Regarding the key group assignment, you;re right, there was a mistake in my code, here it is code and distribution:
> numServers is maxParallelism
> 
> int numKeys = 1024;
> 		HashMap<Integer, Integer> groups = new HashMap<Integer, Integer>();
> 		for (int numServers = 2; numServers < 17; numServers++) {
> 			groups = new HashMap<Integer, Integer>();
> 			for (int i = 0; i < numKeys; i++) {
> 				int targetKeyGroupIndex = MathUtils.murmurHash(i) % numServers;
> 				Integer mygroup = groups.get(targetKeyGroupIndex);
> 				int count = mygroup == null ? 0 : mygroup;
> 				groups.put(targetKeyGroupIndex, ++count);
> 			}
> 			System.out.println(groups + " " + numServers);
> 		}
> 
> {0=517, 1=507} 2
> {0=364, 1=302, 2=358} 3
> {0=258, 1=239, 2=259, 3=268} 4
> {0=180, 1=220, 2=212, 3=205, 4=207} 5
> {0=193, 1=157, 2=179, 3=171, 4=145, 5=179} 6
> {0=144, 1=161, 2=152, 3=137, 4=160, 5=131, 6=139} 7
> {0=125, 1=132, 2=120, 3=127, 4=133, 5=107, 6=139, 7=141} 8
> {0=120, 1=110, 2=115, 3=123, 4=93, 5=112, 6=121, 7=99, 8=131} 9
> {0=95, 1=106, 2=98, 3=103, 4=108, 5=85, 6=114, 7=114, 8=102, 9=99} 10
> {0=98, 1=83, 2=84, 3=92, 4=89, 5=99, 6=97, 7=80, 8=126, 9=75, 10=101} 11
> {0=98, 1=74, 2=92, 3=90, 4=73, 5=84, 6=95, 7=83, 8=87, 9=81, 10=72, 11=95} 12
> {0=65, 1=84, 2=72, 3=80, 4=71, 5=85, 6=80, 7=79, 8=78, 9=85, 10=81, 11=91, 12=73} 13
> {0=73, 1=83, 2=75, 3=62, 4=81, 5=69, 6=73, 7=71, 8=78, 9=77, 10=75, 11=79, 12=62, 13=66} 14
> {0=67, 1=65, 2=81, 3=84, 4=73, 5=57, 6=76, 7=56, 8=69, 9=62, 10=56, 11=79, 12=75, 13=52, 14=72} 15
> {0=57, 1=72, 2=52, 3=61, 4=63, 5=47, 6=64, 7=80, 8=68, 9=60, 10=68, 11=66, 12=70, 13=60, 14=75, 15=61} 16
> 
> 
> Best,
> Ovidiu
> 
>> On 21 Feb 2017, at 10:52, Till Rohrmann <trohrmann@apache.org <ma...@apache.org>> wrote:
>> 
>> Hi Ovidiu,
>> 
>> at the moment it is not possible to plugin a user defined hash function/key
>> group assignment function. If you like, then you can file a JIRA issue to
>> add this functionality.
>> 
>> The key group assignment in your example looks quite skewed. One question
>> concerning how you calculated it: Shouldn't the number of element in each
>> group sum up to 1024? this only works for the first case. What do the
>> numbers mean then?
>> 
>> Cheers,
>> Till
>> 
>> On Mon, Feb 20, 2017 at 3:45 PM, Ovidiu-Cristian MARCU <
>> ovidiu-cristian.marcu@inria.fr <ma...@inria.fr> <mailto:ovidiu-cristian.marcu@inria.fr <ma...@inria.fr>>> wrote:
>> 
>>> Hi,
>>> 
>>> Thank you for clarifications (I am working with KeyedStream so a custom
>>> partitioner does not help).
>>> 
>>> So I should set maxParallelism>=parallelism and change my keys (from
>>> input.keyBy(0)) such that key group assignment works as expected),
>>> but I can’t modify these keys in order to make it work.
>>> 
>>> The other option is to change Flink’s internals in order to evenly
>>> distribute keys (changing computeKeyGroupForKeyHash: is this enough?).
>>> What I was looking for was an api to change the way key group assignment
>>> is done, but without changing Flink’s runtime.
>>> 
>>> I think that the maxParallelism setting is not enough (it introduces this
>>> inefficient way of distributing data for processing when using KeyedStream).
>>> Is it possible to expose somehow the key group assignment?
>>> 
>>> This is how keys are distributed (1024 keys, key=1..1024; and groups from
>>> 2 to 16 - equiv. parallelism that is number of slots):
>>> 
>>> {0=517, 1=507} 2
>>> {0=881, 1=809, 2=358} 3
>>> {0=1139, 1=1048, 2=617, 3=268} 4
>>> {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
>>> {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
>>> {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
>>> {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
>>> {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
>>> {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233, 9=99}
>>> 10
>>> {0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359, 9=174,
>>> 10=101} 11
>>> {0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446,
>>> 9=255, 10=173, 11=95} 12
>>> {0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524,
>>> 9=340, 10=254, 11=186, 12=73} 13
>>> {0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602,
>>> 9=417, 10=329, 11=265, 12=135, 13=66} 14
>>> {0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671,
>>> 9=479, 10=385, 11=344, 12=210, 13=118, 14=72} 15
>>> {0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739,
>>> 9=539, 10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16
>>> 
>>> Best,
>>> Ovidiu
>>> 
>>>> On 20 Feb 2017, at 12:04, Till Rohrmann <trohrmann@apache.org <ma...@apache.org>> wrote:
>>>> 
>>>> Hi Ovidiu,
>>>> 
>>>> the way Flink works is to assign key group ranges to operators. For each
>>> element you calculate a hash value and based on that you assign it to a key
>>> group. Thus, in your example, you have either a key group with more than 1
>>> key or multiple key groups with 1 or more keys assigned to an operator.
>>>> 
>>>> So what you could try to do is to reduce the number of key groups to
>>> your parallelism via env.setMaxParallelism() and then try to figure a key
>>> out whose hashes are uniformly distributed over the key groups. The key
>>> group assignment is calculated via murmurHash(key.hashCode()) %
>>> maxParallelism.
>>>> 
>>>> Alternatively if you don’t need a keyed stream, you could try to use a
>>> custom partitioner via DataStream.partitionCustom.
>>>> 
>>>> Cheers,
>>>> Till
>>>> 
>>>> 
>>>> On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU <
>>> ovidiu-cristian.marcu@inria.fr <ma...@inria.fr> <mailto:ovidiu-cristian.marcu@inria.fr <ma...@inria.fr>> <mailto:ovidiu-cristian.marcu@inria.fr <ma...@inria.fr> <mailto:ovidiu-cristian.marcu@inria.fr <ma...@inria.fr>>>>
>>> wrote:
>>>> Hi,
>>>> 
>>>> Can you please comment on how can I ensure stream input records are
>>> distributed evenly onto task slots?
>>>> See attached screen Records received issue.
>>>> 
>>>> I have a simple application which is applying some window function over
>>> a stream partitioned as follows:
>>>> (parallelism is equal to the number of keys; records with the same key
>>> are streamed evenly)
>>>> 
>>>> // get the execution environment
>>>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
>>> getExecutionEnvironment();
>>>> // get input data by connecting to the socket
>>>> DataStream<String> text = env.socketTextStream("localhost", port, "\n");
>>>> DataStream<Tuple8<String, String, String, Integer, String, Double, Long,
>>> Long>> input = text.flatMap(...);
>>>> DataStream<Double> counts1 = null;
>>>> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
>>>>             .apply(new WindowFunction<Tuple8<String, String, String,
>>> Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
>>>>             ...
>>>>             });
>>>> counts1.writeAsText(params.get("output1"));
>>>> env.execute("Socket Window WordCount”);
>>>> 
>>>> Best,
>>>> Ovidiu


Re: KeyGroupRangeAssignment ?

Posted by Ovidiu-Cristian MARCU <ov...@inria.fr>.
Hi Till,

I will look into filling a jira issue.

Regarding the key group assignment, you;re right, there was a mistake in my code, here it is code and distribution:
numServers is maxParallelism

int numKeys = 1024;
		HashMap<Integer, Integer> groups = new HashMap<Integer, Integer>();
		for (int numServers = 2; numServers < 17; numServers++) {
			groups = new HashMap<Integer, Integer>();
			for (int i = 0; i < numKeys; i++) {
				int targetKeyGroupIndex = MathUtils.murmurHash(i) % numServers;
				Integer mygroup = groups.get(targetKeyGroupIndex);
				int count = mygroup == null ? 0 : mygroup;
				groups.put(targetKeyGroupIndex, ++count);
			}
			System.out.println(groups + " " + numServers);
		}

{0=517, 1=507} 2
{0=364, 1=302, 2=358} 3
{0=258, 1=239, 2=259, 3=268} 4
{0=180, 1=220, 2=212, 3=205, 4=207} 5
{0=193, 1=157, 2=179, 3=171, 4=145, 5=179} 6
{0=144, 1=161, 2=152, 3=137, 4=160, 5=131, 6=139} 7
{0=125, 1=132, 2=120, 3=127, 4=133, 5=107, 6=139, 7=141} 8
{0=120, 1=110, 2=115, 3=123, 4=93, 5=112, 6=121, 7=99, 8=131} 9
{0=95, 1=106, 2=98, 3=103, 4=108, 5=85, 6=114, 7=114, 8=102, 9=99} 10
{0=98, 1=83, 2=84, 3=92, 4=89, 5=99, 6=97, 7=80, 8=126, 9=75, 10=101} 11
{0=98, 1=74, 2=92, 3=90, 4=73, 5=84, 6=95, 7=83, 8=87, 9=81, 10=72, 11=95} 12
{0=65, 1=84, 2=72, 3=80, 4=71, 5=85, 6=80, 7=79, 8=78, 9=85, 10=81, 11=91, 12=73} 13
{0=73, 1=83, 2=75, 3=62, 4=81, 5=69, 6=73, 7=71, 8=78, 9=77, 10=75, 11=79, 12=62, 13=66} 14
{0=67, 1=65, 2=81, 3=84, 4=73, 5=57, 6=76, 7=56, 8=69, 9=62, 10=56, 11=79, 12=75, 13=52, 14=72} 15
{0=57, 1=72, 2=52, 3=61, 4=63, 5=47, 6=64, 7=80, 8=68, 9=60, 10=68, 11=66, 12=70, 13=60, 14=75, 15=61} 16


Best,
Ovidiu

> On 21 Feb 2017, at 10:52, Till Rohrmann <tr...@apache.org> wrote:
> 
> Hi Ovidiu,
> 
> at the moment it is not possible to plugin a user defined hash function/key
> group assignment function. If you like, then you can file a JIRA issue to
> add this functionality.
> 
> The key group assignment in your example looks quite skewed. One question
> concerning how you calculated it: Shouldn't the number of element in each
> group sum up to 1024? this only works for the first case. What do the
> numbers mean then?
> 
> Cheers,
> Till
> 
> On Mon, Feb 20, 2017 at 3:45 PM, Ovidiu-Cristian MARCU <
> ovidiu-cristian.marcu@inria.fr <ma...@inria.fr>> wrote:
> 
>> Hi,
>> 
>> Thank you for clarifications (I am working with KeyedStream so a custom
>> partitioner does not help).
>> 
>> So I should set maxParallelism>=parallelism and change my keys (from
>> input.keyBy(0)) such that key group assignment works as expected),
>> but I can’t modify these keys in order to make it work.
>> 
>> The other option is to change Flink’s internals in order to evenly
>> distribute keys (changing computeKeyGroupForKeyHash: is this enough?).
>> What I was looking for was an api to change the way key group assignment
>> is done, but without changing Flink’s runtime.
>> 
>> I think that the maxParallelism setting is not enough (it introduces this
>> inefficient way of distributing data for processing when using KeyedStream).
>> Is it possible to expose somehow the key group assignment?
>> 
>> This is how keys are distributed (1024 keys, key=1..1024; and groups from
>> 2 to 16 - equiv. parallelism that is number of slots):
>> 
>> {0=517, 1=507} 2
>> {0=881, 1=809, 2=358} 3
>> {0=1139, 1=1048, 2=617, 3=268} 4
>> {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
>> {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
>> {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
>> {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
>> {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
>> {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233, 9=99}
>> 10
>> {0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359, 9=174,
>> 10=101} 11
>> {0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446,
>> 9=255, 10=173, 11=95} 12
>> {0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524,
>> 9=340, 10=254, 11=186, 12=73} 13
>> {0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602,
>> 9=417, 10=329, 11=265, 12=135, 13=66} 14
>> {0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671,
>> 9=479, 10=385, 11=344, 12=210, 13=118, 14=72} 15
>> {0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739,
>> 9=539, 10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16
>> 
>> Best,
>> Ovidiu
>> 
>>> On 20 Feb 2017, at 12:04, Till Rohrmann <tr...@apache.org> wrote:
>>> 
>>> Hi Ovidiu,
>>> 
>>> the way Flink works is to assign key group ranges to operators. For each
>> element you calculate a hash value and based on that you assign it to a key
>> group. Thus, in your example, you have either a key group with more than 1
>> key or multiple key groups with 1 or more keys assigned to an operator.
>>> 
>>> So what you could try to do is to reduce the number of key groups to
>> your parallelism via env.setMaxParallelism() and then try to figure a key
>> out whose hashes are uniformly distributed over the key groups. The key
>> group assignment is calculated via murmurHash(key.hashCode()) %
>> maxParallelism.
>>> 
>>> Alternatively if you don’t need a keyed stream, you could try to use a
>> custom partitioner via DataStream.partitionCustom.
>>> 
>>> Cheers,
>>> Till
>>> 
>>> 
>>> On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU <
>> ovidiu-cristian.marcu@inria.fr <ma...@inria.fr> <mailto:ovidiu-cristian.marcu@inria.fr <ma...@inria.fr>>>
>> wrote:
>>> Hi,
>>> 
>>> Can you please comment on how can I ensure stream input records are
>> distributed evenly onto task slots?
>>> See attached screen Records received issue.
>>> 
>>> I have a simple application which is applying some window function over
>> a stream partitioned as follows:
>>> (parallelism is equal to the number of keys; records with the same key
>> are streamed evenly)
>>> 
>>> // get the execution environment
>>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
>> getExecutionEnvironment();
>>> // get input data by connecting to the socket
>>> DataStream<String> text = env.socketTextStream("localhost", port, "\n");
>>> DataStream<Tuple8<String, String, String, Integer, String, Double, Long,
>> Long>> input = text.flatMap(...);
>>> DataStream<Double> counts1 = null;
>>> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
>>>              .apply(new WindowFunction<Tuple8<String, String, String,
>> Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
>>>              ...
>>>              });
>>> counts1.writeAsText(params.get("output1"));
>>> env.execute("Socket Window WordCount”);
>>> 
>>> Best,
>>> Ovidiu


Re: KeyGroupRangeAssignment ?

Posted by Till Rohrmann <tr...@apache.org>.
Hi Ovidiu,

given you experiments the data distribution does not look too bad. Maybe
you won't get a 10 keys per operator assignment but it should be around 10
keys per operator. Have you tried measuring the latency between the fastest
and slowest subtask in your topology?

But I've also heard from other users that controlling the hash function
would be useful to leverage existing partitionings as they might exist in
Kafka in order to avoid shuffles, for example.

Cheers,
Till

On Tue, Feb 21, 2017 at 5:21 PM, Ovidiu-Cristian MARCU <
ovidiu-cristian.marcu@inria.fr> wrote:

> My case is the following: I have one stream source of elements, each
> element contains some key.
> I create a KeyedStream and then window it (so I get a WindowedStream) on
> top of which I apply some window function.
>
> Some numbers to my problem: 1 million records, 1000 keys.
> I assume parallelism is 100 and maxParallelism is 200.
> So each slot will take 2 groups, I need to split my records evenly into
> groups such that each slot task will process an equal number of records.
>
> In the end, assuming the key is an integer (1 to 1000), I change the
> method assignToKeyGroup such that it returns key%maxParallelism.
> This solves my problem.
>
> Because the rate of elements is constant for each key, using current code
> does not ensure equal distribution of keys, so skewed computation gives
> skewed latency.
>
> However, I would like to be able to change the way Flink is assigning key
> to keyGroups without changing the runtime.
>
> I’m using KeyBy for windowed transformations, hope makes sense.
> I understand key groups are a nice way of rescaling an application.
>
> Best,
> Ovidiu
>
> > On 21 Feb 2017, at 16:18, Aljoscha Krettek <al...@apache.org> wrote:
> >
> > I'm afraid that won't work because we also internally use murmur hash on
> > the result of hashCode().
> >
> > @Ovidiu I still want to understand why you want to use keyBy() for that
> > case. It sounds like you want to use it because you would like to do
> > something else but that is not possible with the Flink APIs. The fact
> that
> > key groups exist is more of an implementation detail and exposing that to
> > users does not seem like to right way to go.
> >
> > On Tue, 21 Feb 2017 at 16:10 Greg Hogan <co...@greghogan.com> wrote:
> >
> >> Integer's hashCode is the identity function. Store your slot index in an
> >> Integer or IntValue and key off that field.
> >>
> >> On Tue, Feb 21, 2017 at 6:04 AM, Ovidiu-Cristian MARCU <
> >> ovidiu-cristian.marcu@inria.fr> wrote:
> >>
> >>> Hi,
> >>>
> >>> As in my example, each key is a window so I want to evenly distributed
> >>> processing to all slots.
> >>> If I have 100 keys and 100 slots, for each key I have the same rate of
> >>> events, I don’t want skewed distribution.
> >>>
> >>> Best,
> >>> Ovidiu
> >>>
> >>>> On 21 Feb 2017, at 11:38, Aljoscha Krettek <al...@apache.org>
> >> wrote:
> >>>>
> >>>> Hi Ovidiu,
> >>>> what's the reason for wanting to make the parallelism equal to the
> >> number
> >>>> of keys? I think in general it's very hard to ensure that hashes even
> >> go
> >>> to
> >>>> different key groups. It can always happen that all your keys (if you
> >>> have
> >>>> so few of them) are assigned to the same parallel operator instance.
> >>>>
> >>>> Cheers,
> >>>> Aljoscha
> >>>>
> >>>> On Tue, 21 Feb 2017 at 10:53 Till Rohrmann <tr...@apache.org>
> >> wrote:
> >>>>
> >>>>> Hi Ovidiu,
> >>>>>
> >>>>> at the moment it is not possible to plugin a user defined hash
> >>> function/key
> >>>>> group assignment function. If you like, then you can file a JIRA
> issue
> >>> to
> >>>>> add this functionality.
> >>>>>
> >>>>> The key group assignment in your example looks quite skewed. One
> >>> question
> >>>>> concerning how you calculated it: Shouldn't the number of element in
> >>> each
> >>>>> group sum up to 1024? this only works for the first case. What do the
> >>>>> numbers mean then?
> >>>>>
> >>>>> Cheers,
> >>>>> Till
> >>>>>
> >>>>> On Mon, Feb 20, 2017 at 3:45 PM, Ovidiu-Cristian MARCU <
> >>>>> ovidiu-cristian.marcu@inria.fr> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> Thank you for clarifications (I am working with KeyedStream so a
> >> custom
> >>>>>> partitioner does not help).
> >>>>>>
> >>>>>> So I should set maxParallelism>=parallelism and change my keys (from
> >>>>>> input.keyBy(0)) such that key group assignment works as expected),
> >>>>>> but I can’t modify these keys in order to make it work.
> >>>>>>
> >>>>>> The other option is to change Flink’s internals in order to evenly
> >>>>>> distribute keys (changing computeKeyGroupForKeyHash: is this
> >> enough?).
> >>>>>> What I was looking for was an api to change the way key group
> >>> assignment
> >>>>>> is done, but without changing Flink’s runtime.
> >>>>>>
> >>>>>> I think that the maxParallelism setting is not enough (it introduces
> >>> this
> >>>>>> inefficient way of distributing data for processing when using
> >>>>> KeyedStream).
> >>>>>> Is it possible to expose somehow the key group assignment?
> >>>>>>
> >>>>>> This is how keys are distributed (1024 keys, key=1..1024; and groups
> >>> from
> >>>>>> 2 to 16 - equiv. parallelism that is number of slots):
> >>>>>>
> >>>>>> {0=517, 1=507} 2
> >>>>>> {0=881, 1=809, 2=358} 3
> >>>>>> {0=1139, 1=1048, 2=617, 3=268} 4
> >>>>>> {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
> >>>>>> {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
> >>>>>> {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
> >>>>>> {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
> >>>>>> {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131}
> 9
> >>>>>> {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233,
> >>> 9=99}
> >>>>>> 10
> >>>>>> {0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359,
> >>>>> 9=174,
> >>>>>> 10=101} 11
> >>>>>> {0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446,
> >>>>>> 9=255, 10=173, 11=95} 12
> >>>>>> {0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524,
> >>>>>> 9=340, 10=254, 11=186, 12=73} 13
> >>>>>> {0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602,
> >>>>>> 9=417, 10=329, 11=265, 12=135, 13=66} 14
> >>>>>> {0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723,
> 8=671,
> >>>>>> 9=479, 10=385, 11=344, 12=210, 13=118, 14=72} 15
> >>>>>> {0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803,
> 8=739,
> >>>>>> 9=539, 10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16
> >>>>>>
> >>>>>> Best,
> >>>>>> Ovidiu
> >>>>>>
> >>>>>>> On 20 Feb 2017, at 12:04, Till Rohrmann <tr...@apache.org>
> >> wrote:
> >>>>>>>
> >>>>>>> Hi Ovidiu,
> >>>>>>>
> >>>>>>> the way Flink works is to assign key group ranges to operators. For
> >>>>> each
> >>>>>> element you calculate a hash value and based on that you assign it
> >> to a
> >>>>> key
> >>>>>> group. Thus, in your example, you have either a key group with more
> >>> than
> >>>>> 1
> >>>>>> key or multiple key groups with 1 or more keys assigned to an
> >> operator.
> >>>>>>>
> >>>>>>> So what you could try to do is to reduce the number of key groups
> to
> >>>>>> your parallelism via env.setMaxParallelism() and then try to figure
> a
> >>> key
> >>>>>> out whose hashes are uniformly distributed over the key groups. The
> >> key
> >>>>>> group assignment is calculated via murmurHash(key.hashCode()) %
> >>>>>> maxParallelism.
> >>>>>>>
> >>>>>>> Alternatively if you don’t need a keyed stream, you could try to
> >> use a
> >>>>>> custom partitioner via DataStream.partitionCustom.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Till
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU <
> >>>>>> ovidiu-cristian.marcu@inria.fr <mailto:
> >> ovidiu-cristian.marcu@inria.fr
> >>>>>
> >>>>>> wrote:
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> Can you please comment on how can I ensure stream input records are
> >>>>>> distributed evenly onto task slots?
> >>>>>>> See attached screen Records received issue.
> >>>>>>>
> >>>>>>> I have a simple application which is applying some window function
> >>> over
> >>>>>> a stream partitioned as follows:
> >>>>>>> (parallelism is equal to the number of keys; records with the same
> >> key
> >>>>>> are streamed evenly)
> >>>>>>>
> >>>>>>> // get the execution environment
> >>>>>>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> >>>>>> getExecutionEnvironment();
> >>>>>>> // get input data by connecting to the socket
> >>>>>>> DataStream<String> text = env.socketTextStream("localhost", port,
> >>>>> "\n");
> >>>>>>> DataStream<Tuple8<String, String, String, Integer, String, Double,
> >>>>> Long,
> >>>>>> Long>> input = text.flatMap(...);
> >>>>>>> DataStream<Double> counts1 = null;
> >>>>>>> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
> >>>>>>>             .apply(new WindowFunction<Tuple8<String, String,
> >> String,
> >>>>>> Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>()
> >> {
> >>>>>>>             ...
> >>>>>>>             });
> >>>>>>> counts1.writeAsText(params.get("output1"));
> >>>>>>> env.execute("Socket Window WordCount”);
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Ovidiu
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>
> >>>
> >>
>
>

Re: KeyGroupRangeAssignment ?

Posted by Ovidiu-Cristian MARCU <ov...@inria.fr>.
My case is the following: I have one stream source of elements, each element contains some key.
I create a KeyedStream and then window it (so I get a WindowedStream) on top of which I apply some window function.

Some numbers to my problem: 1 million records, 1000 keys.
I assume parallelism is 100 and maxParallelism is 200.
So each slot will take 2 groups, I need to split my records evenly into groups such that each slot task will process an equal number of records.

In the end, assuming the key is an integer (1 to 1000), I change the method assignToKeyGroup such that it returns key%maxParallelism.
This solves my problem.

Because the rate of elements is constant for each key, using current code does not ensure equal distribution of keys, so skewed computation gives skewed latency.

However, I would like to be able to change the way Flink is assigning key to keyGroups without changing the runtime.

I’m using KeyBy for windowed transformations, hope makes sense.
I understand key groups are a nice way of rescaling an application.

Best,
Ovidiu

> On 21 Feb 2017, at 16:18, Aljoscha Krettek <al...@apache.org> wrote:
> 
> I'm afraid that won't work because we also internally use murmur hash on
> the result of hashCode().
> 
> @Ovidiu I still want to understand why you want to use keyBy() for that
> case. It sounds like you want to use it because you would like to do
> something else but that is not possible with the Flink APIs. The fact that
> key groups exist is more of an implementation detail and exposing that to
> users does not seem like to right way to go.
> 
> On Tue, 21 Feb 2017 at 16:10 Greg Hogan <co...@greghogan.com> wrote:
> 
>> Integer's hashCode is the identity function. Store your slot index in an
>> Integer or IntValue and key off that field.
>> 
>> On Tue, Feb 21, 2017 at 6:04 AM, Ovidiu-Cristian MARCU <
>> ovidiu-cristian.marcu@inria.fr> wrote:
>> 
>>> Hi,
>>> 
>>> As in my example, each key is a window so I want to evenly distributed
>>> processing to all slots.
>>> If I have 100 keys and 100 slots, for each key I have the same rate of
>>> events, I don’t want skewed distribution.
>>> 
>>> Best,
>>> Ovidiu
>>> 
>>>> On 21 Feb 2017, at 11:38, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>>> 
>>>> Hi Ovidiu,
>>>> what's the reason for wanting to make the parallelism equal to the
>> number
>>>> of keys? I think in general it's very hard to ensure that hashes even
>> go
>>> to
>>>> different key groups. It can always happen that all your keys (if you
>>> have
>>>> so few of them) are assigned to the same parallel operator instance.
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>> 
>>>> On Tue, 21 Feb 2017 at 10:53 Till Rohrmann <tr...@apache.org>
>> wrote:
>>>> 
>>>>> Hi Ovidiu,
>>>>> 
>>>>> at the moment it is not possible to plugin a user defined hash
>>> function/key
>>>>> group assignment function. If you like, then you can file a JIRA issue
>>> to
>>>>> add this functionality.
>>>>> 
>>>>> The key group assignment in your example looks quite skewed. One
>>> question
>>>>> concerning how you calculated it: Shouldn't the number of element in
>>> each
>>>>> group sum up to 1024? this only works for the first case. What do the
>>>>> numbers mean then?
>>>>> 
>>>>> Cheers,
>>>>> Till
>>>>> 
>>>>> On Mon, Feb 20, 2017 at 3:45 PM, Ovidiu-Cristian MARCU <
>>>>> ovidiu-cristian.marcu@inria.fr> wrote:
>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> Thank you for clarifications (I am working with KeyedStream so a
>> custom
>>>>>> partitioner does not help).
>>>>>> 
>>>>>> So I should set maxParallelism>=parallelism and change my keys (from
>>>>>> input.keyBy(0)) such that key group assignment works as expected),
>>>>>> but I can’t modify these keys in order to make it work.
>>>>>> 
>>>>>> The other option is to change Flink’s internals in order to evenly
>>>>>> distribute keys (changing computeKeyGroupForKeyHash: is this
>> enough?).
>>>>>> What I was looking for was an api to change the way key group
>>> assignment
>>>>>> is done, but without changing Flink’s runtime.
>>>>>> 
>>>>>> I think that the maxParallelism setting is not enough (it introduces
>>> this
>>>>>> inefficient way of distributing data for processing when using
>>>>> KeyedStream).
>>>>>> Is it possible to expose somehow the key group assignment?
>>>>>> 
>>>>>> This is how keys are distributed (1024 keys, key=1..1024; and groups
>>> from
>>>>>> 2 to 16 - equiv. parallelism that is number of slots):
>>>>>> 
>>>>>> {0=517, 1=507} 2
>>>>>> {0=881, 1=809, 2=358} 3
>>>>>> {0=1139, 1=1048, 2=617, 3=268} 4
>>>>>> {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
>>>>>> {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
>>>>>> {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
>>>>>> {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
>>>>>> {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
>>>>>> {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233,
>>> 9=99}
>>>>>> 10
>>>>>> {0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359,
>>>>> 9=174,
>>>>>> 10=101} 11
>>>>>> {0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446,
>>>>>> 9=255, 10=173, 11=95} 12
>>>>>> {0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524,
>>>>>> 9=340, 10=254, 11=186, 12=73} 13
>>>>>> {0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602,
>>>>>> 9=417, 10=329, 11=265, 12=135, 13=66} 14
>>>>>> {0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671,
>>>>>> 9=479, 10=385, 11=344, 12=210, 13=118, 14=72} 15
>>>>>> {0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739,
>>>>>> 9=539, 10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16
>>>>>> 
>>>>>> Best,
>>>>>> Ovidiu
>>>>>> 
>>>>>>> On 20 Feb 2017, at 12:04, Till Rohrmann <tr...@apache.org>
>> wrote:
>>>>>>> 
>>>>>>> Hi Ovidiu,
>>>>>>> 
>>>>>>> the way Flink works is to assign key group ranges to operators. For
>>>>> each
>>>>>> element you calculate a hash value and based on that you assign it
>> to a
>>>>> key
>>>>>> group. Thus, in your example, you have either a key group with more
>>> than
>>>>> 1
>>>>>> key or multiple key groups with 1 or more keys assigned to an
>> operator.
>>>>>>> 
>>>>>>> So what you could try to do is to reduce the number of key groups to
>>>>>> your parallelism via env.setMaxParallelism() and then try to figure a
>>> key
>>>>>> out whose hashes are uniformly distributed over the key groups. The
>> key
>>>>>> group assignment is calculated via murmurHash(key.hashCode()) %
>>>>>> maxParallelism.
>>>>>>> 
>>>>>>> Alternatively if you don’t need a keyed stream, you could try to
>> use a
>>>>>> custom partitioner via DataStream.partitionCustom.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>> 
>>>>>>> 
>>>>>>> On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU <
>>>>>> ovidiu-cristian.marcu@inria.fr <mailto:
>> ovidiu-cristian.marcu@inria.fr
>>>>> 
>>>>>> wrote:
>>>>>>> Hi,
>>>>>>> 
>>>>>>> Can you please comment on how can I ensure stream input records are
>>>>>> distributed evenly onto task slots?
>>>>>>> See attached screen Records received issue.
>>>>>>> 
>>>>>>> I have a simple application which is applying some window function
>>> over
>>>>>> a stream partitioned as follows:
>>>>>>> (parallelism is equal to the number of keys; records with the same
>> key
>>>>>> are streamed evenly)
>>>>>>> 
>>>>>>> // get the execution environment
>>>>>>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
>>>>>> getExecutionEnvironment();
>>>>>>> // get input data by connecting to the socket
>>>>>>> DataStream<String> text = env.socketTextStream("localhost", port,
>>>>> "\n");
>>>>>>> DataStream<Tuple8<String, String, String, Integer, String, Double,
>>>>> Long,
>>>>>> Long>> input = text.flatMap(...);
>>>>>>> DataStream<Double> counts1 = null;
>>>>>>> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
>>>>>>>             .apply(new WindowFunction<Tuple8<String, String,
>> String,
>>>>>> Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>()
>> {
>>>>>>>             ...
>>>>>>>             });
>>>>>>> counts1.writeAsText(params.get("output1"));
>>>>>>> env.execute("Socket Window WordCount”);
>>>>>>> 
>>>>>>> Best,
>>>>>>> Ovidiu
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>> 
>>> 
>> 


Re: KeyGroupRangeAssignment ?

Posted by Aljoscha Krettek <al...@apache.org>.
I'm afraid that won't work because we also internally use murmur hash on
the result of hashCode().

@Ovidiu I still want to understand why you want to use keyBy() for that
case. It sounds like you want to use it because you would like to do
something else but that is not possible with the Flink APIs. The fact that
key groups exist is more of an implementation detail and exposing that to
users does not seem like to right way to go.

On Tue, 21 Feb 2017 at 16:10 Greg Hogan <co...@greghogan.com> wrote:

> Integer's hashCode is the identity function. Store your slot index in an
> Integer or IntValue and key off that field.
>
> On Tue, Feb 21, 2017 at 6:04 AM, Ovidiu-Cristian MARCU <
> ovidiu-cristian.marcu@inria.fr> wrote:
>
> > Hi,
> >
> > As in my example, each key is a window so I want to evenly distributed
> > processing to all slots.
> > If I have 100 keys and 100 slots, for each key I have the same rate of
> > events, I don’t want skewed distribution.
> >
> > Best,
> > Ovidiu
> >
> > > On 21 Feb 2017, at 11:38, Aljoscha Krettek <al...@apache.org>
> wrote:
> > >
> > > Hi Ovidiu,
> > > what's the reason for wanting to make the parallelism equal to the
> number
> > > of keys? I think in general it's very hard to ensure that hashes even
> go
> > to
> > > different key groups. It can always happen that all your keys (if you
> > have
> > > so few of them) are assigned to the same parallel operator instance.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Tue, 21 Feb 2017 at 10:53 Till Rohrmann <tr...@apache.org>
> wrote:
> > >
> > >> Hi Ovidiu,
> > >>
> > >> at the moment it is not possible to plugin a user defined hash
> > function/key
> > >> group assignment function. If you like, then you can file a JIRA issue
> > to
> > >> add this functionality.
> > >>
> > >> The key group assignment in your example looks quite skewed. One
> > question
> > >> concerning how you calculated it: Shouldn't the number of element in
> > each
> > >> group sum up to 1024? this only works for the first case. What do the
> > >> numbers mean then?
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Mon, Feb 20, 2017 at 3:45 PM, Ovidiu-Cristian MARCU <
> > >> ovidiu-cristian.marcu@inria.fr> wrote:
> > >>
> > >>> Hi,
> > >>>
> > >>> Thank you for clarifications (I am working with KeyedStream so a
> custom
> > >>> partitioner does not help).
> > >>>
> > >>> So I should set maxParallelism>=parallelism and change my keys (from
> > >>> input.keyBy(0)) such that key group assignment works as expected),
> > >>> but I can’t modify these keys in order to make it work.
> > >>>
> > >>> The other option is to change Flink’s internals in order to evenly
> > >>> distribute keys (changing computeKeyGroupForKeyHash: is this
> enough?).
> > >>> What I was looking for was an api to change the way key group
> > assignment
> > >>> is done, but without changing Flink’s runtime.
> > >>>
> > >>> I think that the maxParallelism setting is not enough (it introduces
> > this
> > >>> inefficient way of distributing data for processing when using
> > >> KeyedStream).
> > >>> Is it possible to expose somehow the key group assignment?
> > >>>
> > >>> This is how keys are distributed (1024 keys, key=1..1024; and groups
> > from
> > >>> 2 to 16 - equiv. parallelism that is number of slots):
> > >>>
> > >>> {0=517, 1=507} 2
> > >>> {0=881, 1=809, 2=358} 3
> > >>> {0=1139, 1=1048, 2=617, 3=268} 4
> > >>> {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
> > >>> {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
> > >>> {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
> > >>> {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
> > >>> {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
> > >>> {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233,
> > 9=99}
> > >>> 10
> > >>> {0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359,
> > >> 9=174,
> > >>> 10=101} 11
> > >>> {0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446,
> > >>> 9=255, 10=173, 11=95} 12
> > >>> {0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524,
> > >>> 9=340, 10=254, 11=186, 12=73} 13
> > >>> {0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602,
> > >>> 9=417, 10=329, 11=265, 12=135, 13=66} 14
> > >>> {0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671,
> > >>> 9=479, 10=385, 11=344, 12=210, 13=118, 14=72} 15
> > >>> {0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739,
> > >>> 9=539, 10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16
> > >>>
> > >>> Best,
> > >>> Ovidiu
> > >>>
> > >>>> On 20 Feb 2017, at 12:04, Till Rohrmann <tr...@apache.org>
> wrote:
> > >>>>
> > >>>> Hi Ovidiu,
> > >>>>
> > >>>> the way Flink works is to assign key group ranges to operators. For
> > >> each
> > >>> element you calculate a hash value and based on that you assign it
> to a
> > >> key
> > >>> group. Thus, in your example, you have either a key group with more
> > than
> > >> 1
> > >>> key or multiple key groups with 1 or more keys assigned to an
> operator.
> > >>>>
> > >>>> So what you could try to do is to reduce the number of key groups to
> > >>> your parallelism via env.setMaxParallelism() and then try to figure a
> > key
> > >>> out whose hashes are uniformly distributed over the key groups. The
> key
> > >>> group assignment is calculated via murmurHash(key.hashCode()) %
> > >>> maxParallelism.
> > >>>>
> > >>>> Alternatively if you don’t need a keyed stream, you could try to
> use a
> > >>> custom partitioner via DataStream.partitionCustom.
> > >>>>
> > >>>> Cheers,
> > >>>> Till
> > >>>>
> > >>>>
> > >>>> On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU <
> > >>> ovidiu-cristian.marcu@inria.fr <mailto:
> ovidiu-cristian.marcu@inria.fr
> > >>
> > >>> wrote:
> > >>>> Hi,
> > >>>>
> > >>>> Can you please comment on how can I ensure stream input records are
> > >>> distributed evenly onto task slots?
> > >>>> See attached screen Records received issue.
> > >>>>
> > >>>> I have a simple application which is applying some window function
> > over
> > >>> a stream partitioned as follows:
> > >>>> (parallelism is equal to the number of keys; records with the same
> key
> > >>> are streamed evenly)
> > >>>>
> > >>>> // get the execution environment
> > >>>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> > >>> getExecutionEnvironment();
> > >>>> // get input data by connecting to the socket
> > >>>> DataStream<String> text = env.socketTextStream("localhost", port,
> > >> "\n");
> > >>>> DataStream<Tuple8<String, String, String, Integer, String, Double,
> > >> Long,
> > >>> Long>> input = text.flatMap(...);
> > >>>> DataStream<Double> counts1 = null;
> > >>>> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
> > >>>>              .apply(new WindowFunction<Tuple8<String, String,
> String,
> > >>> Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>()
> {
> > >>>>              ...
> > >>>>              });
> > >>>> counts1.writeAsText(params.get("output1"));
> > >>>> env.execute("Socket Window WordCount”);
> > >>>>
> > >>>> Best,
> > >>>> Ovidiu
> > >>>>
> > >>>>
> > >>>
> > >>>
> > >>
> >
> >
>

Re: KeyGroupRangeAssignment ?

Posted by Greg Hogan <co...@greghogan.com>.
Integer's hashCode is the identity function. Store your slot index in an
Integer or IntValue and key off that field.

On Tue, Feb 21, 2017 at 6:04 AM, Ovidiu-Cristian MARCU <
ovidiu-cristian.marcu@inria.fr> wrote:

> Hi,
>
> As in my example, each key is a window so I want to evenly distributed
> processing to all slots.
> If I have 100 keys and 100 slots, for each key I have the same rate of
> events, I don’t want skewed distribution.
>
> Best,
> Ovidiu
>
> > On 21 Feb 2017, at 11:38, Aljoscha Krettek <al...@apache.org> wrote:
> >
> > Hi Ovidiu,
> > what's the reason for wanting to make the parallelism equal to the number
> > of keys? I think in general it's very hard to ensure that hashes even go
> to
> > different key groups. It can always happen that all your keys (if you
> have
> > so few of them) are assigned to the same parallel operator instance.
> >
> > Cheers,
> > Aljoscha
> >
> > On Tue, 21 Feb 2017 at 10:53 Till Rohrmann <tr...@apache.org> wrote:
> >
> >> Hi Ovidiu,
> >>
> >> at the moment it is not possible to plugin a user defined hash
> function/key
> >> group assignment function. If you like, then you can file a JIRA issue
> to
> >> add this functionality.
> >>
> >> The key group assignment in your example looks quite skewed. One
> question
> >> concerning how you calculated it: Shouldn't the number of element in
> each
> >> group sum up to 1024? this only works for the first case. What do the
> >> numbers mean then?
> >>
> >> Cheers,
> >> Till
> >>
> >> On Mon, Feb 20, 2017 at 3:45 PM, Ovidiu-Cristian MARCU <
> >> ovidiu-cristian.marcu@inria.fr> wrote:
> >>
> >>> Hi,
> >>>
> >>> Thank you for clarifications (I am working with KeyedStream so a custom
> >>> partitioner does not help).
> >>>
> >>> So I should set maxParallelism>=parallelism and change my keys (from
> >>> input.keyBy(0)) such that key group assignment works as expected),
> >>> but I can’t modify these keys in order to make it work.
> >>>
> >>> The other option is to change Flink’s internals in order to evenly
> >>> distribute keys (changing computeKeyGroupForKeyHash: is this enough?).
> >>> What I was looking for was an api to change the way key group
> assignment
> >>> is done, but without changing Flink’s runtime.
> >>>
> >>> I think that the maxParallelism setting is not enough (it introduces
> this
> >>> inefficient way of distributing data for processing when using
> >> KeyedStream).
> >>> Is it possible to expose somehow the key group assignment?
> >>>
> >>> This is how keys are distributed (1024 keys, key=1..1024; and groups
> from
> >>> 2 to 16 - equiv. parallelism that is number of slots):
> >>>
> >>> {0=517, 1=507} 2
> >>> {0=881, 1=809, 2=358} 3
> >>> {0=1139, 1=1048, 2=617, 3=268} 4
> >>> {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
> >>> {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
> >>> {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
> >>> {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
> >>> {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
> >>> {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233,
> 9=99}
> >>> 10
> >>> {0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359,
> >> 9=174,
> >>> 10=101} 11
> >>> {0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446,
> >>> 9=255, 10=173, 11=95} 12
> >>> {0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524,
> >>> 9=340, 10=254, 11=186, 12=73} 13
> >>> {0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602,
> >>> 9=417, 10=329, 11=265, 12=135, 13=66} 14
> >>> {0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671,
> >>> 9=479, 10=385, 11=344, 12=210, 13=118, 14=72} 15
> >>> {0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739,
> >>> 9=539, 10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16
> >>>
> >>> Best,
> >>> Ovidiu
> >>>
> >>>> On 20 Feb 2017, at 12:04, Till Rohrmann <tr...@apache.org> wrote:
> >>>>
> >>>> Hi Ovidiu,
> >>>>
> >>>> the way Flink works is to assign key group ranges to operators. For
> >> each
> >>> element you calculate a hash value and based on that you assign it to a
> >> key
> >>> group. Thus, in your example, you have either a key group with more
> than
> >> 1
> >>> key or multiple key groups with 1 or more keys assigned to an operator.
> >>>>
> >>>> So what you could try to do is to reduce the number of key groups to
> >>> your parallelism via env.setMaxParallelism() and then try to figure a
> key
> >>> out whose hashes are uniformly distributed over the key groups. The key
> >>> group assignment is calculated via murmurHash(key.hashCode()) %
> >>> maxParallelism.
> >>>>
> >>>> Alternatively if you don’t need a keyed stream, you could try to use a
> >>> custom partitioner via DataStream.partitionCustom.
> >>>>
> >>>> Cheers,
> >>>> Till
> >>>>
> >>>>
> >>>> On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU <
> >>> ovidiu-cristian.marcu@inria.fr <mailto:ovidiu-cristian.marcu@inria.fr
> >>
> >>> wrote:
> >>>> Hi,
> >>>>
> >>>> Can you please comment on how can I ensure stream input records are
> >>> distributed evenly onto task slots?
> >>>> See attached screen Records received issue.
> >>>>
> >>>> I have a simple application which is applying some window function
> over
> >>> a stream partitioned as follows:
> >>>> (parallelism is equal to the number of keys; records with the same key
> >>> are streamed evenly)
> >>>>
> >>>> // get the execution environment
> >>>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> >>> getExecutionEnvironment();
> >>>> // get input data by connecting to the socket
> >>>> DataStream<String> text = env.socketTextStream("localhost", port,
> >> "\n");
> >>>> DataStream<Tuple8<String, String, String, Integer, String, Double,
> >> Long,
> >>> Long>> input = text.flatMap(...);
> >>>> DataStream<Double> counts1 = null;
> >>>> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
> >>>>              .apply(new WindowFunction<Tuple8<String, String, String,
> >>> Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
> >>>>              ...
> >>>>              });
> >>>> counts1.writeAsText(params.get("output1"));
> >>>> env.execute("Socket Window WordCount”);
> >>>>
> >>>> Best,
> >>>> Ovidiu
> >>>>
> >>>>
> >>>
> >>>
> >>
>
>

Re: KeyGroupRangeAssignment ?

Posted by Ovidiu-Cristian MARCU <ov...@inria.fr>.
Hi,

As in my example, each key is a window so I want to evenly distributed processing to all slots.
If I have 100 keys and 100 slots, for each key I have the same rate of events, I don’t want skewed distribution.

Best,
Ovidiu

> On 21 Feb 2017, at 11:38, Aljoscha Krettek <al...@apache.org> wrote:
> 
> Hi Ovidiu,
> what's the reason for wanting to make the parallelism equal to the number
> of keys? I think in general it's very hard to ensure that hashes even go to
> different key groups. It can always happen that all your keys (if you have
> so few of them) are assigned to the same parallel operator instance.
> 
> Cheers,
> Aljoscha
> 
> On Tue, 21 Feb 2017 at 10:53 Till Rohrmann <tr...@apache.org> wrote:
> 
>> Hi Ovidiu,
>> 
>> at the moment it is not possible to plugin a user defined hash function/key
>> group assignment function. If you like, then you can file a JIRA issue to
>> add this functionality.
>> 
>> The key group assignment in your example looks quite skewed. One question
>> concerning how you calculated it: Shouldn't the number of element in each
>> group sum up to 1024? this only works for the first case. What do the
>> numbers mean then?
>> 
>> Cheers,
>> Till
>> 
>> On Mon, Feb 20, 2017 at 3:45 PM, Ovidiu-Cristian MARCU <
>> ovidiu-cristian.marcu@inria.fr> wrote:
>> 
>>> Hi,
>>> 
>>> Thank you for clarifications (I am working with KeyedStream so a custom
>>> partitioner does not help).
>>> 
>>> So I should set maxParallelism>=parallelism and change my keys (from
>>> input.keyBy(0)) such that key group assignment works as expected),
>>> but I can’t modify these keys in order to make it work.
>>> 
>>> The other option is to change Flink’s internals in order to evenly
>>> distribute keys (changing computeKeyGroupForKeyHash: is this enough?).
>>> What I was looking for was an api to change the way key group assignment
>>> is done, but without changing Flink’s runtime.
>>> 
>>> I think that the maxParallelism setting is not enough (it introduces this
>>> inefficient way of distributing data for processing when using
>> KeyedStream).
>>> Is it possible to expose somehow the key group assignment?
>>> 
>>> This is how keys are distributed (1024 keys, key=1..1024; and groups from
>>> 2 to 16 - equiv. parallelism that is number of slots):
>>> 
>>> {0=517, 1=507} 2
>>> {0=881, 1=809, 2=358} 3
>>> {0=1139, 1=1048, 2=617, 3=268} 4
>>> {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
>>> {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
>>> {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
>>> {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
>>> {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
>>> {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233, 9=99}
>>> 10
>>> {0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359,
>> 9=174,
>>> 10=101} 11
>>> {0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446,
>>> 9=255, 10=173, 11=95} 12
>>> {0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524,
>>> 9=340, 10=254, 11=186, 12=73} 13
>>> {0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602,
>>> 9=417, 10=329, 11=265, 12=135, 13=66} 14
>>> {0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671,
>>> 9=479, 10=385, 11=344, 12=210, 13=118, 14=72} 15
>>> {0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739,
>>> 9=539, 10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16
>>> 
>>> Best,
>>> Ovidiu
>>> 
>>>> On 20 Feb 2017, at 12:04, Till Rohrmann <tr...@apache.org> wrote:
>>>> 
>>>> Hi Ovidiu,
>>>> 
>>>> the way Flink works is to assign key group ranges to operators. For
>> each
>>> element you calculate a hash value and based on that you assign it to a
>> key
>>> group. Thus, in your example, you have either a key group with more than
>> 1
>>> key or multiple key groups with 1 or more keys assigned to an operator.
>>>> 
>>>> So what you could try to do is to reduce the number of key groups to
>>> your parallelism via env.setMaxParallelism() and then try to figure a key
>>> out whose hashes are uniformly distributed over the key groups. The key
>>> group assignment is calculated via murmurHash(key.hashCode()) %
>>> maxParallelism.
>>>> 
>>>> Alternatively if you don’t need a keyed stream, you could try to use a
>>> custom partitioner via DataStream.partitionCustom.
>>>> 
>>>> Cheers,
>>>> Till
>>>> 
>>>> 
>>>> On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU <
>>> ovidiu-cristian.marcu@inria.fr <ma...@inria.fr>>
>>> wrote:
>>>> Hi,
>>>> 
>>>> Can you please comment on how can I ensure stream input records are
>>> distributed evenly onto task slots?
>>>> See attached screen Records received issue.
>>>> 
>>>> I have a simple application which is applying some window function over
>>> a stream partitioned as follows:
>>>> (parallelism is equal to the number of keys; records with the same key
>>> are streamed evenly)
>>>> 
>>>> // get the execution environment
>>>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
>>> getExecutionEnvironment();
>>>> // get input data by connecting to the socket
>>>> DataStream<String> text = env.socketTextStream("localhost", port,
>> "\n");
>>>> DataStream<Tuple8<String, String, String, Integer, String, Double,
>> Long,
>>> Long>> input = text.flatMap(...);
>>>> DataStream<Double> counts1 = null;
>>>> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
>>>>              .apply(new WindowFunction<Tuple8<String, String, String,
>>> Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
>>>>              ...
>>>>              });
>>>> counts1.writeAsText(params.get("output1"));
>>>> env.execute("Socket Window WordCount”);
>>>> 
>>>> Best,
>>>> Ovidiu
>>>> 
>>>> 
>>> 
>>> 
>> 


Re: KeyGroupRangeAssignment ?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Ovidiu,
what's the reason for wanting to make the parallelism equal to the number
of keys? I think in general it's very hard to ensure that hashes even go to
different key groups. It can always happen that all your keys (if you have
so few of them) are assigned to the same parallel operator instance.

Cheers,
Aljoscha

On Tue, 21 Feb 2017 at 10:53 Till Rohrmann <tr...@apache.org> wrote:

> Hi Ovidiu,
>
> at the moment it is not possible to plugin a user defined hash function/key
> group assignment function. If you like, then you can file a JIRA issue to
> add this functionality.
>
> The key group assignment in your example looks quite skewed. One question
> concerning how you calculated it: Shouldn't the number of element in each
> group sum up to 1024? this only works for the first case. What do the
> numbers mean then?
>
> Cheers,
> Till
>
> On Mon, Feb 20, 2017 at 3:45 PM, Ovidiu-Cristian MARCU <
> ovidiu-cristian.marcu@inria.fr> wrote:
>
> > Hi,
> >
> > Thank you for clarifications (I am working with KeyedStream so a custom
> > partitioner does not help).
> >
> > So I should set maxParallelism>=parallelism and change my keys (from
> > input.keyBy(0)) such that key group assignment works as expected),
> > but I can’t modify these keys in order to make it work.
> >
> > The other option is to change Flink’s internals in order to evenly
> > distribute keys (changing computeKeyGroupForKeyHash: is this enough?).
> > What I was looking for was an api to change the way key group assignment
> > is done, but without changing Flink’s runtime.
> >
> > I think that the maxParallelism setting is not enough (it introduces this
> > inefficient way of distributing data for processing when using
> KeyedStream).
> > Is it possible to expose somehow the key group assignment?
> >
> > This is how keys are distributed (1024 keys, key=1..1024; and groups from
> > 2 to 16 - equiv. parallelism that is number of slots):
> >
> > {0=517, 1=507} 2
> > {0=881, 1=809, 2=358} 3
> > {0=1139, 1=1048, 2=617, 3=268} 4
> > {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
> > {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
> > {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
> > {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
> > {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
> > {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233, 9=99}
> > 10
> > {0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359,
> 9=174,
> > 10=101} 11
> > {0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446,
> > 9=255, 10=173, 11=95} 12
> > {0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524,
> > 9=340, 10=254, 11=186, 12=73} 13
> > {0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602,
> > 9=417, 10=329, 11=265, 12=135, 13=66} 14
> > {0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671,
> > 9=479, 10=385, 11=344, 12=210, 13=118, 14=72} 15
> > {0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739,
> > 9=539, 10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16
> >
> > Best,
> > Ovidiu
> >
> > > On 20 Feb 2017, at 12:04, Till Rohrmann <tr...@apache.org> wrote:
> > >
> > > Hi Ovidiu,
> > >
> > > the way Flink works is to assign key group ranges to operators. For
> each
> > element you calculate a hash value and based on that you assign it to a
> key
> > group. Thus, in your example, you have either a key group with more than
> 1
> > key or multiple key groups with 1 or more keys assigned to an operator.
> > >
> > > So what you could try to do is to reduce the number of key groups to
> > your parallelism via env.setMaxParallelism() and then try to figure a key
> > out whose hashes are uniformly distributed over the key groups. The key
> > group assignment is calculated via murmurHash(key.hashCode()) %
> > maxParallelism.
> > >
> > > Alternatively if you don’t need a keyed stream, you could try to use a
> > custom partitioner via DataStream.partitionCustom.
> > >
> > > Cheers,
> > > Till
> > >
> > >
> > > On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU <
> > ovidiu-cristian.marcu@inria.fr <ma...@inria.fr>>
> > wrote:
> > > Hi,
> > >
> > > Can you please comment on how can I ensure stream input records are
> > distributed evenly onto task slots?
> > > See attached screen Records received issue.
> > >
> > > I have a simple application which is applying some window function over
> > a stream partitioned as follows:
> > > (parallelism is equal to the number of keys; records with the same key
> > are streamed evenly)
> > >
> > > // get the execution environment
> > > final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> > getExecutionEnvironment();
> > > // get input data by connecting to the socket
> > > DataStream<String> text = env.socketTextStream("localhost", port,
> "\n");
> > > DataStream<Tuple8<String, String, String, Integer, String, Double,
> Long,
> > Long>> input = text.flatMap(...);
> > > DataStream<Double> counts1 = null;
> > > counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
> > >               .apply(new WindowFunction<Tuple8<String, String, String,
> > Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
> > >               ...
> > >               });
> > > counts1.writeAsText(params.get("output1"));
> > > env.execute("Socket Window WordCount”);
> > >
> > > Best,
> > > Ovidiu
> > >
> > >
> >
> >
>

Re: KeyGroupRangeAssignment ?

Posted by Till Rohrmann <tr...@apache.org>.
Hi Ovidiu,

at the moment it is not possible to plugin a user defined hash function/key
group assignment function. If you like, then you can file a JIRA issue to
add this functionality.

The key group assignment in your example looks quite skewed. One question
concerning how you calculated it: Shouldn't the number of element in each
group sum up to 1024? this only works for the first case. What do the
numbers mean then?

Cheers,
Till

On Mon, Feb 20, 2017 at 3:45 PM, Ovidiu-Cristian MARCU <
ovidiu-cristian.marcu@inria.fr> wrote:

> Hi,
>
> Thank you for clarifications (I am working with KeyedStream so a custom
> partitioner does not help).
>
> So I should set maxParallelism>=parallelism and change my keys (from
> input.keyBy(0)) such that key group assignment works as expected),
> but I can’t modify these keys in order to make it work.
>
> The other option is to change Flink’s internals in order to evenly
> distribute keys (changing computeKeyGroupForKeyHash: is this enough?).
> What I was looking for was an api to change the way key group assignment
> is done, but without changing Flink’s runtime.
>
> I think that the maxParallelism setting is not enough (it introduces this
> inefficient way of distributing data for processing when using KeyedStream).
> Is it possible to expose somehow the key group assignment?
>
> This is how keys are distributed (1024 keys, key=1..1024; and groups from
> 2 to 16 - equiv. parallelism that is number of slots):
>
> {0=517, 1=507} 2
> {0=881, 1=809, 2=358} 3
> {0=1139, 1=1048, 2=617, 3=268} 4
> {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
> {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
> {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
> {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
> {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
> {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233, 9=99}
> 10
> {0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359, 9=174,
> 10=101} 11
> {0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446,
> 9=255, 10=173, 11=95} 12
> {0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524,
> 9=340, 10=254, 11=186, 12=73} 13
> {0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602,
> 9=417, 10=329, 11=265, 12=135, 13=66} 14
> {0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671,
> 9=479, 10=385, 11=344, 12=210, 13=118, 14=72} 15
> {0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739,
> 9=539, 10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16
>
> Best,
> Ovidiu
>
> > On 20 Feb 2017, at 12:04, Till Rohrmann <tr...@apache.org> wrote:
> >
> > Hi Ovidiu,
> >
> > the way Flink works is to assign key group ranges to operators. For each
> element you calculate a hash value and based on that you assign it to a key
> group. Thus, in your example, you have either a key group with more than 1
> key or multiple key groups with 1 or more keys assigned to an operator.
> >
> > So what you could try to do is to reduce the number of key groups to
> your parallelism via env.setMaxParallelism() and then try to figure a key
> out whose hashes are uniformly distributed over the key groups. The key
> group assignment is calculated via murmurHash(key.hashCode()) %
> maxParallelism.
> >
> > Alternatively if you don’t need a keyed stream, you could try to use a
> custom partitioner via DataStream.partitionCustom.
> >
> > Cheers,
> > Till
> >
> >
> > On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU <
> ovidiu-cristian.marcu@inria.fr <ma...@inria.fr>>
> wrote:
> > Hi,
> >
> > Can you please comment on how can I ensure stream input records are
> distributed evenly onto task slots?
> > See attached screen Records received issue.
> >
> > I have a simple application which is applying some window function over
> a stream partitioned as follows:
> > (parallelism is equal to the number of keys; records with the same key
> are streamed evenly)
> >
> > // get the execution environment
> > final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
> > // get input data by connecting to the socket
> > DataStream<String> text = env.socketTextStream("localhost", port, "\n");
> > DataStream<Tuple8<String, String, String, Integer, String, Double, Long,
> Long>> input = text.flatMap(...);
> > DataStream<Double> counts1 = null;
> > counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
> >               .apply(new WindowFunction<Tuple8<String, String, String,
> Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
> >               ...
> >               });
> > counts1.writeAsText(params.get("output1"));
> > env.execute("Socket Window WordCount”);
> >
> > Best,
> > Ovidiu
> >
> >
>
>

Re: KeyGroupRangeAssignment ?

Posted by Ovidiu-Cristian MARCU <ov...@inria.fr>.
Hi,

Any thoughts on this issue: related to what Till proposed 'to figure a key out whose hashes are uniformly distributed over the key groups’ and a way of exposing the key group assignment through the api?

I wonder how other users are facing this issue.

Having a small set of keys (related to input.keyBy) could be easily tackled with some sort of local mapping but I am considering an use case with millions of keys.

Best,
Ovidiu


> On 20 Feb 2017, at 15:45, Ovidiu-Cristian MARCU <ov...@inria.fr> wrote:
> 
> Hi,
> 
> Thank you for clarifications (I am working with KeyedStream so a custom partitioner does not help).
> 
> So I should set maxParallelism>=parallelism and change my keys (from input.keyBy(0)) such that key group assignment works as expected), 
> but I can’t modify these keys in order to make it work.
> 
> The other option is to change Flink’s internals in order to evenly distribute keys (changing computeKeyGroupForKeyHash: is this enough?).
> What I was looking for was an api to change the way key group assignment is done, but without changing Flink’s runtime.
> 
> I think that the maxParallelism setting is not enough (it introduces this inefficient way of distributing data for processing when using KeyedStream).
> Is it possible to expose somehow the key group assignment?
> 
> This is how keys are distributed (1024 keys, key=1..1024; and groups from 2 to 16 - equiv. parallelism that is number of slots):
> 
> {0=517, 1=507} 2
> {0=881, 1=809, 2=358} 3
> {0=1139, 1=1048, 2=617, 3=268} 4
> {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
> {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
> {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
> {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
> {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
> {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233, 9=99} 10
> {0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359, 9=174, 10=101} 11
> {0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446, 9=255, 10=173, 11=95} 12
> {0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524, 9=340, 10=254, 11=186, 12=73} 13
> {0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602, 9=417, 10=329, 11=265, 12=135, 13=66} 14
> {0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671, 9=479, 10=385, 11=344, 12=210, 13=118, 14=72} 15
> {0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739, 9=539, 10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16
> 
> Best,
> Ovidiu
> 
>> On 20 Feb 2017, at 12:04, Till Rohrmann <tr...@apache.org> wrote:
>> 
>> Hi Ovidiu,
>> 
>> the way Flink works is to assign key group ranges to operators. For each element you calculate a hash value and based on that you assign it to a key group. Thus, in your example, you have either a key group with more than 1 key or multiple key groups with 1 or more keys assigned to an operator.
>> 
>> So what you could try to do is to reduce the number of key groups to your parallelism via env.setMaxParallelism() and then try to figure a key out whose hashes are uniformly distributed over the key groups. The key group assignment is calculated via murmurHash(key.hashCode()) % maxParallelism.
>> 
>> Alternatively if you don’t need a keyed stream, you could try to use a custom partitioner via DataStream.partitionCustom.
>> 
>> Cheers,
>> Till
>> 
>> 
>> On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU <ovidiu-cristian.marcu@inria.fr <ma...@inria.fr>> wrote:
>> Hi,
>> 
>> Can you please comment on how can I ensure stream input records are distributed evenly onto task slots?
>> See attached screen Records received issue.
>> 
>> I have a simple application which is applying some window function over a stream partitioned as follows:
>> (parallelism is equal to the number of keys; records with the same key are streamed evenly)
>> 
>> // get the execution environment
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> // get input data by connecting to the socket
>> DataStream<String> text = env.socketTextStream("localhost", port, "\n");
>> DataStream<Tuple8<String, String, String, Integer, String, Double, Long, Long>> input = text.flatMap(...);
>> DataStream<Double> counts1 = null;
>> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
>> 		.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
>> 		...
>> 		});
>> counts1.writeAsText(params.get("output1"));
>> env.execute("Socket Window WordCount”);
>> 
>> Best,
>> Ovidiu
>> 
>> 
> 


Re: KeyGroupRangeAssignment ?

Posted by Ovidiu-Cristian MARCU <ov...@inria.fr>.
Hi,

Thank you for clarifications (I am working with KeyedStream so a custom partitioner does not help).

So I should set maxParallelism>=parallelism and change my keys (from input.keyBy(0)) such that key group assignment works as expected), 
but I can’t modify these keys in order to make it work.

The other option is to change Flink’s internals in order to evenly distribute keys (changing computeKeyGroupForKeyHash: is this enough?).
What I was looking for was an api to change the way key group assignment is done, but without changing Flink’s runtime.

I think that the maxParallelism setting is not enough (it introduces this inefficient way of distributing data for processing when using KeyedStream).
Is it possible to expose somehow the key group assignment?

This is how keys are distributed (1024 keys, key=1..1024; and groups from 2 to 16 - equiv. parallelism that is number of slots):

{0=517, 1=507} 2
{0=881, 1=809, 2=358} 3
{0=1139, 1=1048, 2=617, 3=268} 4
{0=1319, 1=1268, 2=829, 3=473, 4=207} 5
{0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
{0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
{0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
{0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
{0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233, 9=99} 10
{0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359, 9=174, 10=101} 11
{0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446, 9=255, 10=173, 11=95} 12
{0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524, 9=340, 10=254, 11=186, 12=73} 13
{0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602, 9=417, 10=329, 11=265, 12=135, 13=66} 14
{0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671, 9=479, 10=385, 11=344, 12=210, 13=118, 14=72} 15
{0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739, 9=539, 10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16

Best,
Ovidiu

> On 20 Feb 2017, at 12:04, Till Rohrmann <tr...@apache.org> wrote:
> 
> Hi Ovidiu,
> 
> the way Flink works is to assign key group ranges to operators. For each element you calculate a hash value and based on that you assign it to a key group. Thus, in your example, you have either a key group with more than 1 key or multiple key groups with 1 or more keys assigned to an operator.
> 
> So what you could try to do is to reduce the number of key groups to your parallelism via env.setMaxParallelism() and then try to figure a key out whose hashes are uniformly distributed over the key groups. The key group assignment is calculated via murmurHash(key.hashCode()) % maxParallelism.
> 
> Alternatively if you don’t need a keyed stream, you could try to use a custom partitioner via DataStream.partitionCustom.
> 
> Cheers,
> Till
> 
> 
> On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU <ovidiu-cristian.marcu@inria.fr <ma...@inria.fr>> wrote:
> Hi,
> 
> Can you please comment on how can I ensure stream input records are distributed evenly onto task slots?
> See attached screen Records received issue.
> 
> I have a simple application which is applying some window function over a stream partitioned as follows:
> (parallelism is equal to the number of keys; records with the same key are streamed evenly)
> 
> // get the execution environment
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> // get input data by connecting to the socket
> DataStream<String> text = env.socketTextStream("localhost", port, "\n");
> DataStream<Tuple8<String, String, String, Integer, String, Double, Long, Long>> input = text.flatMap(...);
> DataStream<Double> counts1 = null;
> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
> 		.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
> 		...
> 		});
> counts1.writeAsText(params.get("output1"));
> env.execute("Socket Window WordCount”);
> 
> Best,
> Ovidiu
> 
> 


Re: KeyGroupRangeAssignment ?

Posted by Till Rohrmann <tr...@apache.org>.
Hi Ovidiu,

the way Flink works is to assign key group ranges to operators. For each
element you calculate a hash value and based on that you assign it to a key
group. Thus, in your example, you have either a key group with more than 1
key or multiple key groups with 1 or more keys assigned to an operator.

So what you could try to do is to reduce the number of key groups to your
parallelism via env.setMaxParallelism() and then try to figure a key out
whose hashes are uniformly distributed over the key groups. The key group
assignment is calculated via murmurHash(key.hashCode()) % maxParallelism.

Alternatively if you don’t need a keyed stream, you could try to use a
custom partitioner via DataStream.partitionCustom.

Cheers,
Till
​

On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU <
ovidiu-cristian.marcu@inria.fr> wrote:

> Hi,
>
> Can you please comment on how can I ensure stream input records are
> distributed evenly onto task slots?
> See attached screen Records received issue.
>
> I have a simple application which is applying some window function over a
> stream partitioned as follows:
> (parallelism is equal to the number of keys; records with the same key are
> streamed evenly)
>
> // get the execution environment
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
> // get input data by connecting to the socket
> DataStream<String> text = env.socketTextStream("localhost", port, "\n");
> DataStream<Tuple8<String, String, String, Integer, String, Double, Long,
> Long>> input = text.flatMap(...);
> DataStream<Double> counts1 = null;
> counts1 =* input.keyBy(0*).countWindow(windowSize, slideSize)
> .apply(new WindowFunction<Tuple8<String, String, String, Integer, String,
> Double, Long, Long>, Double, Tuple, GlobalWindow>() {
> ...
> });
> counts1.writeAsText(params.get("output1"));
> env.execute("Socket Window WordCount”);
>
> Best,
> Ovidiu
>