You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jamie Grier <ja...@data-artisans.com> on 2017/01/02 20:48:35 UTC

Re: Set Parallelism and keyBy

Domink,

This should work just as you expect.  Maybe the output of the print is just
misleading you.  The print() operation will still have a parallelism of two
but the flatMap() with have a parallelism of 16 and all data elements with
the same key will get routed to the same host.

Any sequence of keyBy().flatMap() will always properly partition the data
across the instances of the flatMap() operator by key.

-Jamie


On Mon, Dec 26, 2016 at 10:52 AM, Dominik Bruhn <do...@dbruhn.de> wrote:

> Hey,
> I have a flink job which has a default parallelism set to 2. I want to key
> the stream and then apply some flatMap on the keyed stream. The flatMap
> operation is quiet costly, so I want to have a much higher parallelism here
> (lets say 16). Additionally, it is important that the flatMap operation is
> executed for the same key always in the same process or in the same task.
>
> I have the following code:
>
> ----
> env.setParallelism(2)
> val input: DataStream[Event] = /* from somewhere */
> input.keyBy(_.eventType).flatMap(new ExpensiveOperation()).print()
> ----
>
> This works fine, and the "ExpensiveOperation" is executed always on the
> same tasks for the same keys.
>
> Now I tried two things:
>
> 1.
> ----
> env.setParallelism(2)
> val input: DataStream[Event] = /* from somewhere */
> input.keyBy(_.eventType).setParallelism(16).flatMap(new
> ExpensiveOperation()).print()
> ----
> This fails with an exception because I can't set the parallelism on the
> keyBy operator.
>
> 2.
> -----
> env.setParallelism(2)
> val input: DataStream[Event] = /* from somewhere */
> input.keyBy(_.eventType).flatMap(new ExpensiveOperation()).setParal
> lelism(16).print()
> -----
> While this executes, it breaks the assignment of the keys to the tasks:
> The "ExpensiveOperation" is now not executed on the same nodes anymore all
> the time (visible by the prefixes in the print()).
>
> What am I doing wrong? Is the only chance to set the whole parallelism of
> the whole flink job to 16?
>
> Thanks, have nice holidays,
> Dominik
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
jamie@data-artisans.com