You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by John Tipper <jo...@hotmail.com> on 2022/07/18 16:22:53 UTC

PyFlink SQL: force maximum use of slots

Hi all,

Is there a way of forcing a pipeline to use as many slots as possible?  I have a program in PyFlink using SQL and the Table API and currently all of my pipeline is using just a single slot.  I've tried this:

      StreamExecutionEnvironment.get_execution_environment().disable_operator_chaining()

I did have a pipeline which had 17 different tasks running in just a single slot, but all this does is give me 79 different operators but all are still running in a single slot. Is there a way to get Flink to run different jobs in different slots whilst using the Table API and SQL?

Many thanks,

John

Re: PyFlink SQL: force maximum use of slots

Posted by Dian Fu <di...@gmail.com>.
>> I understand that in the Datastream world parallelism means each slot
will get a subset of events. However, how does that work in the SQL world
where you need to do joins between tables?
In Table API & SQL, each slot will also only get a subset of events.

>> If the events in tables A and B are being processed in parallel, then
does this not mean that the slots will have only some events for those
tables A and B in any given slot? How does Flink ensure consistency of
results irrespective of the parallelism used, or does it just copy all
events to all slots, in which case I don't understand how parallelism
assists?
It will partition the events according to the join key and so the events
belonging to the same join key will be sent to the same slot no matter what
parallelism is set. The parallelism only affects how much data one slot
will handle.

Regards
Dian

On Wed, Jul 20, 2022 at 4:44 PM John Tipper <jo...@hotmail.com> wrote:

> Hi Dian,
>
> Thanks very much - I suppose the concept I'm struggling with is
> understanding how parallelism works when using SQL. I understand that in
> the Datastream world parallelism means each slot will get a subset of
> events. However, how does that work in the SQL world where you need to do
> joins between tables? If the events in tables A and B are being processed
> in parallel, then does this not mean that the slots will have only some
> events for those tables A and B in any given slot? How does Flink ensure
> consistency of results irrespective of the parallelism used, or does it
> just copy all events to all slots, in which case I don't understand how
> parallelism assists?
>
> Many thanks,
>
> John
>
> ------------------------------
> *From:* Dian Fu <di...@gmail.com>
> *Sent:* 20 July 2022 05:19
> *To:* John Tipper <jo...@hotmail.com>
> *Cc:* user@flink.apache.org <us...@flink.apache.org>
> *Subject:* Re: PyFlink SQL: force maximum use of slots
>
> Hi John,
>
> All the operators in the same slot sharing group will be put in one slot.
> The slot sharing group is only configurable in DataStream API [1]. Usually
> you don't need to set the slot sharing group explicitly [2] and this is
> good to share the resource between the operators running in the same
> slot. If the performance becomes a problem, you could just increase the
> parallelism or the resource(CPU or Memory) of the TM. For example, if the
> parallelism is set to 2, you will see that there will be two running slots
> and each slot containing all the operators.
>
> Regards,
> Dian
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/operators/overview/#set-slot-sharing-group
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/concepts/flink-architecture/#task-slots-and-resources
>
> On Tue, Jul 19, 2022 at 12:23 AM John Tipper <jo...@hotmail.com>
> wrote:
>
> Hi all,
>
> Is there a way of forcing a pipeline to use as many slots as possible?  I
> have a program in PyFlink using SQL and the Table API and currently all of
> my pipeline is using just a single slot.  I've tried this:
>
> StreamExecutionEnvironment.
> get_execution_environment().disable_operator_chaining()
>
> I did have a pipeline which had 17 different tasks running in just a
> single slot, but all this does is give me 79 different operators but all
> are still running in a single slot. Is there a way to get Flink to run
> different jobs in different slots whilst using the Table API and SQL?
>
> Many thanks,
>
> John
>
>

Re: PyFlink SQL: force maximum use of slots

Posted by John Tipper <jo...@hotmail.com>.
Hi Dian,

Thanks very much - I suppose the concept I'm struggling with is understanding how parallelism works when using SQL. I understand that in the Datastream world parallelism means each slot will get a subset of events. However, how does that work in the SQL world where you need to do joins between tables? If the events in tables A and B are being processed in parallel, then does this not mean that the slots will have only some events for those tables A and B in any given slot? How does Flink ensure consistency of results irrespective of the parallelism used, or does it just copy all events to all slots, in which case I don't understand how parallelism assists?

Many thanks,

John

________________________________
From: Dian Fu <di...@gmail.com>
Sent: 20 July 2022 05:19
To: John Tipper <jo...@hotmail.com>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: Re: PyFlink SQL: force maximum use of slots

Hi John,

All the operators in the same slot sharing group will be put in one slot. The slot sharing group is only configurable in DataStream API [1]. Usually you don't need to set the slot sharing group explicitly [2] and this is good to share the resource between the operators running in the same slot. If the performance becomes a problem, you could just increase the parallelism or the resource(CPU or Memory) of the TM. For example, if the parallelism is set to 2, you will see that there will be two running slots and each slot containing all the operators.

Regards,
Dian

[1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/operators/overview/#set-slot-sharing-group
[2] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/concepts/flink-architecture/#task-slots-and-resources

On Tue, Jul 19, 2022 at 12:23 AM John Tipper <jo...@hotmail.com>> wrote:
Hi all,

Is there a way of forcing a pipeline to use as many slots as possible?  I have a program in PyFlink using SQL and the Table API and currently all of my pipeline is using just a single slot.  I've tried this:

      StreamExecutionEnvironment.get_execution_environment().disable_operator_chaining()

I did have a pipeline which had 17 different tasks running in just a single slot, but all this does is give me 79 different operators but all are still running in a single slot. Is there a way to get Flink to run different jobs in different slots whilst using the Table API and SQL?

Many thanks,

John

Re: PyFlink SQL: force maximum use of slots

Posted by Dian Fu <di...@gmail.com>.
Hi John,

All the operators in the same slot sharing group will be put in one slot.
The slot sharing group is only configurable in DataStream API [1]. Usually
you don't need to set the slot sharing group explicitly [2] and this is
good to share the resource between the operators running in the same
slot. If the performance becomes a problem, you could just increase the
parallelism or the resource(CPU or Memory) of the TM. For example, if the
parallelism is set to 2, you will see that there will be two running slots
and each slot containing all the operators.

Regards,
Dian

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/operators/overview/#set-slot-sharing-group
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/concepts/flink-architecture/#task-slots-and-resources

On Tue, Jul 19, 2022 at 12:23 AM John Tipper <jo...@hotmail.com>
wrote:

> Hi all,
>
> Is there a way of forcing a pipeline to use as many slots as possible?  I
> have a program in PyFlink using SQL and the Table API and currently all of
> my pipeline is using just a single slot.  I've tried this:
>
> StreamExecutionEnvironment.
> get_execution_environment().disable_operator_chaining()
>
> I did have a pipeline which had 17 different tasks running in just a
> single slot, but all this does is give me 79 different operators but all
> are still running in a single slot. Is there a way to get Flink to run
> different jobs in different slots whilst using the Table API and SQL?
>
> Many thanks,
>
> John
>