You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Data Engineer <da...@gmail.com> on 2018/03/28 06:18:24 UTC
How does setMaxParallelism work
I went through the explanation on MaxParallelism in the official docs here:
https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
However, I am not able to figure out how Flink decides the parallelism
value.
For instance, if I setMaxParallelism to 3, I see that for my job, there is
only 1 subtask that is created. How did Flink decide that 1 subtask was
enough?
Regards,
James
Re: How does setMaxParallelism work
Posted by Nico Kruber <ni...@data-artisans.com>.
No, currently, this it is up to you to decide whether you need to scale
and how. If, for a running Flink job, you decide to scale, you
- flink cancel --withSavepoint <targetDirectory> <Job ID>
- flink run -p <newParallelism> --fromSavepoint <savepointPath>
<jar-file> <arguments>
Nico
On 29/03/18 19:27, NEKRASSOV, ALEXEI wrote:
> Is there an auto-scaling feature in Flink, where I start with parallelism of (for example) 1, but Flink notices I have high volume of data to process, and automatically increases parallelism of a running job?
>
> Thanks,
> Alex
>
> -----Original Message-----
> From: Nico Kruber [mailto:nico@data-artisans.com]
> Sent: Wednesday, March 28, 2018 8:54 AM
> To: Data Engineer <da...@gmail.com>
> Cc: Jörn Franke <jo...@gmail.com>; user@flink.apache.org
> Subject: Re: How does setMaxParallelism work
>
> Flink does not decide the parallelism based on your job.
> There is a default parallelism (configured via parallelism.default [1], by default 1) which is used if you do not specify it yourself.
>
>
> Nico
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#common-options
>
> On 28/03/18 13:21, Data Engineer wrote:
>> Agreed. But how did Flink decide that it should allot 1 subtask? Why
>> not
>> 2 or 3?
>> I am trying to understand the implications of using setMaxParallelism
>> vs setParallelism
>>
>> On Wed, Mar 28, 2018 at 2:58 PM, Nico Kruber <nico@data-artisans.com
>> <ma...@data-artisans.com>> wrote:
>>
>> Hi James,
>> the number of subtasks being used is defined by the parallelism, the max
>> parallelism, however, "... determines the maximum parallelism to which
>> you can scale operators" [1]. That is, once set, you cannot ever (even
>> after restarting your program from a savepoint) increase the operator's
>> parallelism above this value. The actual parallelism can be set per job
>> in your program but also in the flink client:
>> flink run -p <parallelism> <jar-file> <arguments>
>>
>>
>> Nico
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>>
>> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production
>> _ready.html#set-maximum-parallelism-for-operators-explicitly>
>>
>> On 28/03/18 09:25, Data Engineer wrote:
>> > I have a sample application that reads around 2 GB of csv files,
>> > converts each record into Avro object and sends it to kafka.
>> > I use a custom FileReader that reads the files in a directory.
>> > I have set taskmanager.numberOfTaskSlots to 4.
>> > I see that if I use setParallelism(3), 3 subtasks are created. But if I
>> > use setMaxParallelism(3), only 1 subtask is created.
>> >
>> > On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke <jornfranke@gmail.com <ma...@gmail.com>
>> > <mailto:jornfranke@gmail.com <ma...@gmail.com>>> wrote:
>> >
>> > What was the input format, the size and the program that you tried
>> > to execute
>> >
>> > On 28. Mar 2018, at 08:18, Data Engineer <dataengineer21@gmail.com <ma...@gmail.com>
>> > <mailto:dataengineer21@gmail.com <ma...@gmail.com>>> wrote:
>> >
>> >> I went through the explanation on MaxParallelism in the official
>> >> docs here:
>> >> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>
>> >> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>>
>> >>
>> >> However, I am not able to figure out how Flink decides the
>> >> parallelism value.
>> >> For instance, if I setMaxParallelism to 3, I see that for my job,
>> >> there is only 1 subtask that is created. How did Flink decide that
>> >> 1 subtask was enough?
>> >>
>> >> Regards,
>> >> James
>> >
>> >
>>
>> --
>> Nico Kruber | Software Engineer
>> data Artisans
>>
>> Follow us @dataArtisans
>> --
>> Join Flink Forward - The Apache Flink Conference
>> Stream Processing | Event Driven | Real Time
>> --
>> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
>> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
>> --
>> Data Artisans GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>
>>
>
> --
> Nico Kruber | Software Engineer
> data Artisans
>
> Follow us @dataArtisans
> --
> Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>
--
Nico Kruber | Software Engineer
data Artisans
Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
RE: How does setMaxParallelism work
Posted by "NEKRASSOV, ALEXEI" <an...@att.com>.
Is there an auto-scaling feature in Flink, where I start with parallelism of (for example) 1, but Flink notices I have high volume of data to process, and automatically increases parallelism of a running job?
Thanks,
Alex
-----Original Message-----
From: Nico Kruber [mailto:nico@data-artisans.com]
Sent: Wednesday, March 28, 2018 8:54 AM
To: Data Engineer <da...@gmail.com>
Cc: Jörn Franke <jo...@gmail.com>; user@flink.apache.org
Subject: Re: How does setMaxParallelism work
Flink does not decide the parallelism based on your job.
There is a default parallelism (configured via parallelism.default [1], by default 1) which is used if you do not specify it yourself.
Nico
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#common-options
On 28/03/18 13:21, Data Engineer wrote:
> Agreed. But how did Flink decide that it should allot 1 subtask? Why
> not
> 2 or 3?
> I am trying to understand the implications of using setMaxParallelism
> vs setParallelism
>
> On Wed, Mar 28, 2018 at 2:58 PM, Nico Kruber <nico@data-artisans.com
> <ma...@data-artisans.com>> wrote:
>
> Hi James,
> the number of subtasks being used is defined by the parallelism, the max
> parallelism, however, "... determines the maximum parallelism to which
> you can scale operators" [1]. That is, once set, you cannot ever (even
> after restarting your program from a savepoint) increase the operator's
> parallelism above this value. The actual parallelism can be set per job
> in your program but also in the flink client:
> flink run -p <parallelism> <jar-file> <arguments>
>
>
> Nico
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>
> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production
> _ready.html#set-maximum-parallelism-for-operators-explicitly>
>
> On 28/03/18 09:25, Data Engineer wrote:
> > I have a sample application that reads around 2 GB of csv files,
> > converts each record into Avro object and sends it to kafka.
> > I use a custom FileReader that reads the files in a directory.
> > I have set taskmanager.numberOfTaskSlots to 4.
> > I see that if I use setParallelism(3), 3 subtasks are created. But if I
> > use setMaxParallelism(3), only 1 subtask is created.
> >
> > On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke <jornfranke@gmail.com <ma...@gmail.com>
> > <mailto:jornfranke@gmail.com <ma...@gmail.com>>> wrote:
> >
> > What was the input format, the size and the program that you tried
> > to execute
> >
> > On 28. Mar 2018, at 08:18, Data Engineer <dataengineer21@gmail.com <ma...@gmail.com>
> > <mailto:dataengineer21@gmail.com <ma...@gmail.com>>> wrote:
> >
> >> I went through the explanation on MaxParallelism in the official
> >> docs here:
> >> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>
> >> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>>
> >>
> >> However, I am not able to figure out how Flink decides the
> >> parallelism value.
> >> For instance, if I setMaxParallelism to 3, I see that for my job,
> >> there is only 1 subtask that is created. How did Flink decide that
> >> 1 subtask was enough?
> >>
> >> Regards,
> >> James
> >
> >
>
> --
> Nico Kruber | Software Engineer
> data Artisans
>
> Follow us @dataArtisans
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>
>
--
Nico Kruber | Software Engineer
data Artisans
Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
Re: How does setMaxParallelism work
Posted by Nico Kruber <ni...@data-artisans.com>.
Flink does not decide the parallelism based on your job.
There is a default parallelism (configured via parallelism.default [1],
by default 1) which is used if you do not specify it yourself.
Nico
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#common-options
On 28/03/18 13:21, Data Engineer wrote:
> Agreed. But how did Flink decide that it should allot 1 subtask? Why not
> 2 or 3?
> I am trying to understand the implications of using setMaxParallelism vs
> setParallelism
>
> On Wed, Mar 28, 2018 at 2:58 PM, Nico Kruber <nico@data-artisans.com
> <ma...@data-artisans.com>> wrote:
>
> Hi James,
> the number of subtasks being used is defined by the parallelism, the max
> parallelism, however, "... determines the maximum parallelism to which
> you can scale operators" [1]. That is, once set, you cannot ever (even
> after restarting your program from a savepoint) increase the operator's
> parallelism above this value. The actual parallelism can be set per job
> in your program but also in the flink client:
> flink run -p <parallelism> <jar-file> <arguments>
>
>
> Nico
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>
>
> On 28/03/18 09:25, Data Engineer wrote:
> > I have a sample application that reads around 2 GB of csv files,
> > converts each record into Avro object and sends it to kafka.
> > I use a custom FileReader that reads the files in a directory.
> > I have set taskmanager.numberOfTaskSlots to 4.
> > I see that if I use setParallelism(3), 3 subtasks are created. But if I
> > use setMaxParallelism(3), only 1 subtask is created.
> >
> > On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke <jornfranke@gmail.com <ma...@gmail.com>
> > <mailto:jornfranke@gmail.com <ma...@gmail.com>>> wrote:
> >
> > What was the input format, the size and the program that you tried
> > to execute
> >
> > On 28. Mar 2018, at 08:18, Data Engineer <dataengineer21@gmail.com <ma...@gmail.com>
> > <mailto:dataengineer21@gmail.com <ma...@gmail.com>>> wrote:
> >
> >> I went through the explanation on MaxParallelism in the official
> >> docs here:
> >> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>
> >> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>>
> >>
> >> However, I am not able to figure out how Flink decides the
> >> parallelism value.
> >> For instance, if I setMaxParallelism to 3, I see that for my job,
> >> there is only 1 subtask that is created. How did Flink decide that
> >> 1 subtask was enough?
> >>
> >> Regards,
> >> James
> >
> >
>
> --
> Nico Kruber | Software Engineer
> data Artisans
>
> Follow us @dataArtisans
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>
>
--
Nico Kruber | Software Engineer
data Artisans
Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
Re: How does setMaxParallelism work
Posted by Data Engineer <da...@gmail.com>.
Agreed. But how did Flink decide that it should allot 1 subtask? Why not 2
or 3?
I am trying to understand the implications of using setMaxParallelism vs
setParallelism
On Wed, Mar 28, 2018 at 2:58 PM, Nico Kruber <ni...@data-artisans.com> wrote:
> Hi James,
> the number of subtasks being used is defined by the parallelism, the max
> parallelism, however, "... determines the maximum parallelism to which
> you can scale operators" [1]. That is, once set, you cannot ever (even
> after restarting your program from a savepoint) increase the operator's
> parallelism above this value. The actual parallelism can be set per job
> in your program but also in the flink client:
> flink run -p <parallelism> <jar-file> <arguments>
>
>
> Nico
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-
> master/ops/production_ready.html#set-maximum-parallelism-
> for-operators-explicitly
>
> On 28/03/18 09:25, Data Engineer wrote:
> > I have a sample application that reads around 2 GB of csv files,
> > converts each record into Avro object and sends it to kafka.
> > I use a custom FileReader that reads the files in a directory.
> > I have set taskmanager.numberOfTaskSlots to 4.
> > I see that if I use setParallelism(3), 3 subtasks are created. But if I
> > use setMaxParallelism(3), only 1 subtask is created.
> >
> > On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke <jornfranke@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> > What was the input format, the size and the program that you tried
> > to execute
> >
> > On 28. Mar 2018, at 08:18, Data Engineer <dataengineer21@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >> I went through the explanation on MaxParallelism in the official
> >> docs here:
> >> https://ci.apache.org/projects/flink/flink-docs-
> master/ops/production_ready.html#set-maximum-parallelism-
> for-operators-explicitly
> >> <https://ci.apache.org/projects/flink/flink-docs-
> master/ops/production_ready.html#set-maximum-parallelism-
> for-operators-explicitly>
> >>
> >> However, I am not able to figure out how Flink decides the
> >> parallelism value.
> >> For instance, if I setMaxParallelism to 3, I see that for my job,
> >> there is only 1 subtask that is created. How did Flink decide that
> >> 1 subtask was enough?
> >>
> >> Regards,
> >> James
> >
> >
>
> --
> Nico Kruber | Software Engineer
> data Artisans
>
> Follow us @dataArtisans
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>
>
Re: How does setMaxParallelism work
Posted by Nico Kruber <ni...@data-artisans.com>.
Hi James,
the number of subtasks being used is defined by the parallelism, the max
parallelism, however, "... determines the maximum parallelism to which
you can scale operators" [1]. That is, once set, you cannot ever (even
after restarting your program from a savepoint) increase the operator's
parallelism above this value. The actual parallelism can be set per job
in your program but also in the flink client:
flink run -p <parallelism> <jar-file> <arguments>
Nico
[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
On 28/03/18 09:25, Data Engineer wrote:
> I have a sample application that reads around 2 GB of csv files,
> converts each record into Avro object and sends it to kafka.
> I use a custom FileReader that reads the files in a directory.
> I have set taskmanager.numberOfTaskSlots to 4.
> I see that if I use setParallelism(3), 3 subtasks are created. But if I
> use setMaxParallelism(3), only 1 subtask is created.
>
> On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke <jornfranke@gmail.com
> <ma...@gmail.com>> wrote:
>
> What was the input format, the size and the program that you tried
> to execute
>
> On 28. Mar 2018, at 08:18, Data Engineer <dataengineer21@gmail.com
> <ma...@gmail.com>> wrote:
>
>> I went through the explanation on MaxParallelism in the official
>> docs here:
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>> <https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly>
>>
>> However, I am not able to figure out how Flink decides the
>> parallelism value.
>> For instance, if I setMaxParallelism to 3, I see that for my job,
>> there is only 1 subtask that is created. How did Flink decide that
>> 1 subtask was enough?
>>
>> Regards,
>> James
>
>
--
Nico Kruber | Software Engineer
data Artisans
Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
Re: How does setMaxParallelism work
Posted by Data Engineer <da...@gmail.com>.
I have a sample application that reads around 2 GB of csv files, converts
each record into Avro object and sends it to kafka.
I use a custom FileReader that reads the files in a directory.
I have set taskmanager.numberOfTaskSlots to 4.
I see that if I use setParallelism(3), 3 subtasks are created. But if I use
setMaxParallelism(3), only 1 subtask is created.
On Wed, Mar 28, 2018 at 12:29 PM, Jörn Franke <jo...@gmail.com> wrote:
> What was the input format, the size and the program that you tried to
> execute
>
> On 28. Mar 2018, at 08:18, Data Engineer <da...@gmail.com> wrote:
>
> I went through the explanation on MaxParallelism in the official docs here:
> https://ci.apache.org/projects/flink/flink-docs-
> master/ops/production_ready.html#set-maximum-parallelism-
> for-operators-explicitly
>
> However, I am not able to figure out how Flink decides the parallelism
> value.
> For instance, if I setMaxParallelism to 3, I see that for my job, there is
> only 1 subtask that is created. How did Flink decide that 1 subtask was
> enough?
>
> Regards,
> James
>
>
Re: How does setMaxParallelism work
Posted by Jörn Franke <jo...@gmail.com>.
What was the input format, the size and the program that you tried to execute
> On 28. Mar 2018, at 08:18, Data Engineer <da...@gmail.com> wrote:
>
> I went through the explanation on MaxParallelism in the official docs here:
> https://ci.apache.org/projects/flink/flink-docs-master/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly
>
> However, I am not able to figure out how Flink decides the parallelism value.
> For instance, if I setMaxParallelism to 3, I see that for my job, there is only 1 subtask that is created. How did Flink decide that 1 subtask was enough?
>
> Regards,
> James