You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by TechnoMage <ml...@technomage.com> on 2018/04/13 02:30:15 UTC

Question about parallelism

I am pretty new to flink.  I have a flink job that has 10 transforms (mostly CoFlatMap with some simple filters and key extractrs as well.  I have the config set for 6 slots and default parallelism of 6, but all my stages show paralellism of 1.  Is that because there is only one task manager?  Some of what I have read suggested separate slots were needed to use multiple threads on a single box?  I have read the section on the docs several times and still not totally sure about the execution model.

Michael

Re: Question about parallelism

Posted by TechnoMage <ml...@technomage.com>.
The client was not using a config file, it is a stand-alone java app using the flink-client jar file.  Thanks for the clarification.

Michael

> On Apr 16, 2018, at 2:11 PM, Fabian Hueske <fh...@gmail.com> wrote:
> 
> The parallelism.default property that is configured in the flink-conf.yaml file is only considered if the config file belongs to the submitting client.
> If you configured the property in the config file of your cluster setup and used submitted from a client that used a different configuration file, the property of the other config file is used.
> 
> I tested the behavior on Flink 1.4.2 and setting the parallelism in the flink-conf.yaml of the client was working correctly in a simple local setup.
> 
> If this doesn't solve your problem, we'd need a bit more information about the job submission and setup.
> 
> Best, Fabian
> 
> 
> 2018-04-16 18:37 GMT+02:00 TechnoMage <mlatta@technomage.com <ma...@technomage.com>>:
> 1.4.2.  I have since set the parallelism explicitly after creating the env and that is working.  I also made the stream object serializable which may also be involved in this.  I will retest without the explicit parallelism when I get a chance.
> 
> Michael
> 
> 
>> On Apr 16, 2018, at 2:05 AM, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
>> 
>> (re-adding user mailing list)
>> 
>> A non-serializable function object should cause the job to fail, but not to ignore a parallelism setting.
>> 
>> This might be a bug. Most users specify the parallelism directly in the application code (via StreamExecutionEnvironment) or when submitting the application.
>> Which version are you using?
>> 
>> Best, Fabian
>> 
>> 2018-04-14 15:07 GMT+02:00 Michael Latta <mlatta@technomage.com <ma...@technomage.com>>:
>> Parallelism in config. I think the issue is that some objects used oin the stream are not serializable (which I just discovered). I am surprised it supports that. 
>> 
>> 
>> Michael
>> 
>> On Apr 14, 2018, at 6:12 AM, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
>> 
>>> Hi,
>>> 
>>> The number of Taskmanagers is irrelevant für the parallelism of a job or operator. The scheduler only cares about the number of slots. 
>>> 
>>> How did you set the default parallelism? In the config or in the program / StreamExecutionEnvironment? 
>>> 
>>> Best, Fabian
>>> 
>>> 
>>> TechnoMage <mlatta@technomage.com <ma...@technomage.com>> schrieb am Fr., 13. Apr. 2018, 04:30:
>>> I am pretty new to flink.  I have a flink job that has 10 transforms (mostly CoFlatMap with some simple filters and key extractrs as well.  I have the config set for 6 slots and default parallelism of 6, but all my stages show paralellism of 1.  Is that because there is only one task manager?  Some of what I have read suggested separate slots were needed to use multiple threads on a single box?  I have read the section on the docs several times and still not totally sure about the execution model.
>>> 
>>> Michael
>> 
> 
> 


Re: Question about parallelism

Posted by Fabian Hueske <fh...@gmail.com>.
The parallelism.default property that is configured in the flink-conf.yaml
file is only considered if the config file belongs to the submitting client.
If you configured the property in the config file of your cluster setup and
used submitted from a client that used a different configuration file, the
property of the other config file is used.

I tested the behavior on Flink 1.4.2 and setting the parallelism in the
flink-conf.yaml of the client was working correctly in a simple local setup.

If this doesn't solve your problem, we'd need a bit more information about
the job submission and setup.

Best, Fabian


2018-04-16 18:37 GMT+02:00 TechnoMage <ml...@technomage.com>:

> 1.4.2.  I have since set the parallelism explicitly after creating the env
> and that is working.  I also made the stream object serializable which may
> also be involved in this.  I will retest without the explicit parallelism
> when I get a chance.
>
> Michael
>
>
> On Apr 16, 2018, at 2:05 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
> (re-adding user mailing list)
>
> A non-serializable function object should cause the job to fail, but not
> to ignore a parallelism setting.
>
> This might be a bug. Most users specify the parallelism directly in the
> application code (via StreamExecutionEnvironment) or when submitting the
> application.
> Which version are you using?
>
> Best, Fabian
>
> 2018-04-14 15:07 GMT+02:00 Michael Latta <ml...@technomage.com>:
>
>> Parallelism in config. I think the issue is that some objects used oin
>> the stream are not serializable (which I just discovered). I am surprised
>> it supports that.
>>
>>
>> Michael
>>
>> On Apr 14, 2018, at 6:12 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>> Hi,
>>
>> The number of Taskmanagers is irrelevant für the parallelism of a job or
>> operator. The scheduler only cares about the number of slots.
>>
>> How did you set the default parallelism? In the config or in the program
>> / StreamExecutionEnvironment?
>>
>> Best, Fabian
>>
>>
>> TechnoMage <ml...@technomage.com> schrieb am Fr., 13. Apr. 2018, 04:30:
>>
>>> I am pretty new to flink.  I have a flink job that has 10 transforms
>>> (mostly CoFlatMap with some simple filters and key extractrs as well.  I
>>> have the config set for 6 slots and default parallelism of 6, but all my
>>> stages show paralellism of 1.  Is that because there is only one task
>>> manager?  Some of what I have read suggested separate slots were needed to
>>> use multiple threads on a single box?  I have read the section on the docs
>>> several times and still not totally sure about the execution model.
>>>
>>> Michael
>>
>>
>
>

Re: Question about parallelism

Posted by TechnoMage <ml...@technomage.com>.
1.4.2.  I have since set the parallelism explicitly after creating the env and that is working.  I also made the stream object serializable which may also be involved in this.  I will retest without the explicit parallelism when I get a chance.

Michael

> On Apr 16, 2018, at 2:05 AM, Fabian Hueske <fh...@gmail.com> wrote:
> 
> (re-adding user mailing list)
> 
> A non-serializable function object should cause the job to fail, but not to ignore a parallelism setting.
> 
> This might be a bug. Most users specify the parallelism directly in the application code (via StreamExecutionEnvironment) or when submitting the application.
> Which version are you using?
> 
> Best, Fabian
> 
> 2018-04-14 15:07 GMT+02:00 Michael Latta <mlatta@technomage.com <ma...@technomage.com>>:
> Parallelism in config. I think the issue is that some objects used oin the stream are not serializable (which I just discovered). I am surprised it supports that. 
> 
> 
> Michael
> 
> On Apr 14, 2018, at 6:12 AM, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
> 
>> Hi,
>> 
>> The number of Taskmanagers is irrelevant für the parallelism of a job or operator. The scheduler only cares about the number of slots. 
>> 
>> How did you set the default parallelism? In the config or in the program / StreamExecutionEnvironment? 
>> 
>> Best, Fabian
>> 
>> 
>> TechnoMage <mlatta@technomage.com <ma...@technomage.com>> schrieb am Fr., 13. Apr. 2018, 04:30:
>> I am pretty new to flink.  I have a flink job that has 10 transforms (mostly CoFlatMap with some simple filters and key extractrs as well.  I have the config set for 6 slots and default parallelism of 6, but all my stages show paralellism of 1.  Is that because there is only one task manager?  Some of what I have read suggested separate slots were needed to use multiple threads on a single box?  I have read the section on the docs several times and still not totally sure about the execution model.
>> 
>> Michael
> 


Re: Question about parallelism

Posted by Fabian Hueske <fh...@gmail.com>.
(re-adding user mailing list)

A non-serializable function object should cause the job to fail, but not to
ignore a parallelism setting.

This might be a bug. Most users specify the parallelism directly in the
application code (via StreamExecutionEnvironment) or when submitting the
application.
Which version are you using?

Best, Fabian

2018-04-14 15:07 GMT+02:00 Michael Latta <ml...@technomage.com>:

> Parallelism in config. I think the issue is that some objects used oin the
> stream are not serializable (which I just discovered). I am surprised it
> supports that.
>
>
> Michael
>
> On Apr 14, 2018, at 6:12 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi,
>
> The number of Taskmanagers is irrelevant für the parallelism of a job or
> operator. The scheduler only cares about the number of slots.
>
> How did you set the default parallelism? In the config or in the program /
> StreamExecutionEnvironment?
>
> Best, Fabian
>
>
> TechnoMage <ml...@technomage.com> schrieb am Fr., 13. Apr. 2018, 04:30:
>
>> I am pretty new to flink.  I have a flink job that has 10 transforms
>> (mostly CoFlatMap with some simple filters and key extractrs as well.  I
>> have the config set for 6 slots and default parallelism of 6, but all my
>> stages show paralellism of 1.  Is that because there is only one task
>> manager?  Some of what I have read suggested separate slots were needed to
>> use multiple threads on a single box?  I have read the section on the docs
>> several times and still not totally sure about the execution model.
>>
>> Michael
>
>