You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Hang Ruan <ru...@gmail.com> on 2023/03/24 08:41:20 UTC

Re: Is there a way to control the parallelism of auto-generated Flink operators of the FlinkSQL job graph?

Hi, Elkhan,

I think this is an intended behavior. If the parallelism of an operator is
not specified, it will be the same as the previous one instead of the
default parallelism.
Actually the table planner will help us to do most jobs. There should not
be a way to modify the parallelism for every operator. After all we don't
know what operators will be contained when we write the sql.

Best,
Hang

Elkhan Dadashov <el...@gmail.com> 于2023年3月24日周五 14:14写道:

> Checking with the community again, if anyone explored this before.
>
> Thanks.
>
>
> On Fri, Mar 17, 2023 at 1:56 PM Elkhan Dadashov <elkhan.dadashov@gmail.com
> >
> wrote:
>
> > Dear Flink developers,
> >
> > Wanted to check, if there is a way to control the parallelism of
> > auto-generated Flink operators of the FlinkSQL job graph?
> >
> > In Java API, it is possible to have full control of the parallelism of
> > each operator.
> >
> > On FlinkSQL some source and sink connectors support `source.parallelism`
> > and `sink.parallelism`, and the rest can be set via
> `default.parallelism`.
> >
> > In this particular scenario, enchancedEvents gets chained to the
> > KafkaSource operator, it can be separated by calling disableChain() on
> > KafkaSource  stream on Kafka connector side, but even with disabled
> > chaining on the source stream, `enhancedEvents` operator parallelism is
> > still set to 5 (same as Kafka Source operator parallelism), instead of 3
> > (which is default parallelism) :
> >
> > ```sql
> > SET 'parallelism.default' = '3';
> >
> > CREATE TABLE input_kafka_table
> > (
> >     ...
> >     ts AS TO_TIMESTAMP_LTZ(CAST(`timestamp` AS BIGINT),3),
> >     WATERMARK FOR ts AS ts - INTERVAL '1' MINUTE
> > ) WITH (
> >     'connector' = 'kafka',
> >     'source.parallelism' = '5' // this is supported by cutomization of
> > kafka connector
> >     ...
> > );
> >
> > CREATE TEMPORARY VIEW enhancedEvents AS (
> >      SELECT x, y
> >      FROM input_kafka_table, LATERAL TABLE(udf.doIt(x, y)
> > );
> >
> > CREATE TABLE other_table_source (...) WITH(...);
> > CREATE TABLE other_table_sink (...) WITH(...);
> >
> > BEGIN STATEMENT SET;
> >  INSERT into enhancedEventsSink (Select * from enhancedEvents);
> >  INSERT into other_table_sink (Select z from other_table_source );
> > END;
> > ```
> >
> > Is there a way to force override parallelism of auto-generated operators
> > for FlinkSQL pipeline?
> >
> > Or is this expected behavior of some operator's parallelism not assigned
> > from default parallelism but from another operator's parallelism?
> >
> > Want to understand if this is a bug or intended behavior.
> >
> > Thank you.
> >
> >
>