You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Niklas Semmler (Jira)" <ji...@apache.org> on 2022/03/11 11:05:00 UTC

[jira] [Comment Edited] (FLINK-26330) Test Adaptive Batch Scheduler manually

    [ https://issues.apache.org/jira/browse/FLINK-26330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504863#comment-17504863 ] 

Niklas Semmler edited comment on FLINK-26330 at 3/11/22, 11:04 AM:
-------------------------------------------------------------------

Hi [~kingWang],

I will try to give an example. [~wanglijie95] can you please double-check my logic?

*Setup*
* The job consists of Mappers followed by Reducers.
* The user specifies[1] 1GB data volume per task.
* The user specifies a maximum parallelism of 16[2]. (M = 4)
* The Mappers output a combined 10GB.
* Also, for simplicity let's assume that we only have a single Mapper.

*Background*
* The maximum parallelism is equal to many partitions each preceding task creates. 

*Before the adjustment*
The number of Reducers is determined by the output volume divided by the specified volume. Hence, you would 10 Reducers. Each Mapper produces 16 output partitions. So, we need to distribute 16 partitions over 10 receiving tasks. Let's assume first that we have a single mapper, then each output partition has 10/16 GB and some Reducers would receive two partitions (20/16GB = 1.25 GB) and some just one (10/16GB = 0.626 GB).

*After the adjustment*
The previous number of Reducers is rounded to the closest power of 2. (N = 3) In this case it would be 8. So, we create 8 Reducers and each Reducer receives 2 GB.

*Conclusions*
And, now we have 2 GB instead of 1GB... Okay, so I think there is still something going amiss. Unless, I made an error in my calculation. [~wanglijie95] Do you see a flaw in my reasoning?

Just an idea. If we would instead let the user set the **maximum** volume and always round up in the adjustment, then I think this would always work as long as as the maximum parallelism is large enough. On the downside this will may create quite a large number of partitions. Not sure what the impact is there.

*Disclaimer*
* I am not sure how exactly output partitions are distributed to downstream tasks. I think this uses a round-robin mechanism.
* I am not sure how this differs when you use multiple Mappers. Will this even out the distribution, because each Mapper distributes its volume differently? I would have to look into this further.

[1] via {{ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK}}
[2] via {{ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM}}


was (Author: JIRAUSER281719):
Hi [~kingWang],

I will try to give an example. [~wanglijie95] can you please double-check my logic?

*Setup*
* The job consists of Mappers followed by Reducers.
* The user specifies[1] 1GB data volume per task.
* The user specifies a maximum parallelism of 16[2]. (M = 4)
* The Mappers output a combined 10GB.
* Also, for simplicity let's assume that we only have a single Mapper.

*Background*
* The maximum parallelism is equal to many partitions each preceding task creates. 

*Before the adjustment*
The number of Reducers is determined by the output volume divided by the specified volume. Hence, you would 10 Reducers. Each Mapper produces 16 output partitions. So, we need to distribute 16 partitions over 10 receiving tasks. Let's assume first that we have a single mapper, then each output partition has 10/16 GB and some Reducers would receive two partitions (20/16GB = 1.25 GB) and some just one (10/16GB = 0.626 GB).

*After the adjustment*
The previous number of Reducers is rounded to the closest power of 2. (N = 3) In this case it would be 8. So, we create 8 Reducers and each Reducer receives 2 GB.

*Conclusions*
And, now we have 2 GB instead of 1GB... Okay, so I think there is still something going amiss. Unless, I made an error in my calculation. [~wanglijie95] Do you see a flaw in my reasoning?

Just an idea. If we would instead let the user set the **maximum** volume and always round up in the adjustment, then I think this would always work as long as as the maximum parallelism is large enough. On the downside this will may create quite a large number of partitions. Not sure what the impact is there.

**Disclaimer**
* I am not sure how exactly output partitions are distributed to downstream tasks. I think this uses a round-robin mechanism.
* I am not sure how this differs when you use multiple Mappers. Will this even out the distribution, because each Mapper distributes its volume differently? I would have to look into this further.

[1] via {{ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK}}
[2] via {{ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM}}

> Test Adaptive Batch Scheduler manually
> --------------------------------------
>
>                 Key: FLINK-26330
>                 URL: https://issues.apache.org/jira/browse/FLINK-26330
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>            Reporter: Lijie Wang
>            Assignee: Niklas Semmler
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.15.0
>
>
> Documentation: [https://github.com/apache/flink/pull/18757]
> Run DataStream / SQL batch jobs with Adaptive Batch Scheduler and verifiy:
> 1. Whether the automatically decided parallelism is correct
> 2. Whether the job result is correct
>  
> *For example:*
> {code:java}
> final Configuration configuration = new Configuration();
> configuration.set(
>         JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.AdaptiveBatch);
> configuration.setInteger(
>         JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM, 4);
> configuration.set(
>         JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK,
>         MemorySize.parse("8kb"));
> configuration.setInteger("parallelism.default", -1);
> final StreamExecutionEnvironment env =
>         StreamExecutionEnvironment.createLocalEnvironment(configuration);
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> env.fromSequence(0, 1000).setParallelism(1)
>         .keyBy(num -> num % 10)
>         .sum(0)
>         .addSink(new PrintSinkFunction<>());
> env.execute(); {code}
> You can run above job and check:
>  
> 1. The parallelism of "Keyed Aggregation -> Sink: Unnamed" should be 3. Jobmanager logs show following logs:
> {code:java}
> Parallelism of JobVertex: Keyed Aggregation -> Sink: Unnamed (20ba6b65f97481d5570070de90e4e791) is decided to be 3. {code}
> 2. The job result should be:
> {code:java}
> 50500
> 49600
> 49700
> 49800
> 49900
> 50000
> 50100
> 50200
> 50300
> 50400 {code}
>  
> You can change the amout of data produced by source and config options of adaptive batch scheduler according your wishes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)