You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ken Krugler <kk...@transpac.com> on 2016/04/27 06:25:12 UTC

Tuning parallelism in cascading-flink planner

Hi all,

I’m busy tuning up a workflow (defined w/Cascading, planned with Flink) that runs on a 5 slave EMR cluster.

The default parallelism (from the Flink planner) is set to 40, since I’ve got 5 task managers (one per node) and 8 slots/TM.

But this seems to jam things up, as I see simultaneous GroupReduce subtasks competing for resources (or so it seems).

Any insight into how to tune this?

And what’s the right way to set it on a sub-task basis? With Cascading Flows planned for M-R I can set the number of reducers via a Hadoop JobConf configuration setting, on a per-step (to use Cascading lingo) basis. But with a Flow planned for Flink, there’s only a single “step”.

Thanks,

— Ken


Re: Tuning parallelism in cascading-flink planner

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Ken,

at the moment, there are just two parameters to control the parallelism of
Flink operators generated by the Cascading-Flink connector.

The parameters are:
- flink.num.sourceTasks to specify the parallelism of source tasks.
- flink.num.shuffleTasks to specify the parallelism of all shuffling tasks
(GroupBy, CoGroup).

Non-shuffling operators such as Each/Map and HashJoin take the parallelism
of their predecessor (for HashJoin the first input) to avoid random
shuffling.
So an Each/Map or Join that immediately follows a source runs with the
source parallelism.
Effectively, most operators will run with the shuffle parallelism, because
Each and HashJoin pick it up once their input was shuffled.

It is currently not possible to specify the parallelism of an individual
task.
However, I am open for suggestions if you have a good idea to improve the
situation.
I think we should continue the discussion on the Cascading-Flink Github
project since this is feature would not require changes in Flink but only
in the Cascading Flink runner.

Best, Fabian

2016-04-27 6:25 GMT+02:00 Ken Krugler <kk...@transpac.com>:

> Hi all,
>
> I’m busy tuning up a workflow (defined w/Cascading, planned with Flink)
> that runs on a 5 slave EMR cluster.
>
> The default parallelism (from the Flink planner) is set to 40, since I’ve
> got 5 task managers (one per node) and 8 slots/TM.
>
> But this seems to jam things up, as I see simultaneous GroupReduce
> subtasks competing for resources (or so it seems).
>
> Any insight into how to tune this?
>
> And what’s the right way to set it on a sub-task basis? With Cascading
> Flows planned for M-R I can set the number of reducers via a Hadoop JobConf
> configuration setting, on a per-step (to use Cascading lingo) basis. But
> with a Flow planned for Flink, there’s only a single “step”.
>
> Thanks,
>
> — Ken
>
>