You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Rui Fan (Jira)" <ji...@apache.org> on 2023/09/20 15:27:00 UTC

[jira] [Commented] (FLINK-33123) Wrong dynamic replacement of partitioner from FORWARD to REBLANCE for autoscaler and adaptive scheduler

    [ https://issues.apache.org/jira/browse/FLINK-33123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17767162#comment-17767162 ] 

Rui Fan commented on FLINK-33123:
---------------------------------

Hi [~Zhanghao Chen] , thanks for your report!

Your analysis is correct, and I also found this bug last month. I didn't report it because I don't know how to fix it properly.

First of all, I prefer changing it from FORWARD to REBALANCE, and I said the reason at [code review|https://github.com/apache/flink/pull/21443#discussion_r1042919428] of FLINK-30213.

Why I don't know how to fix it properly?

As you said: we should changing the distribution type to ALL_TO_ALL in jobgraph. Here is some issues here:
 * issue1: The jobGraph is just generated once for Adaptive Scheduler. And the jobgraph isn't changed even if the parallelism is changed(Just update the ExecutionGraph).
 * If the issue1 is solved, the issue2 is how to handle the case that from REBALANCE to FORWARD?
 ** Assume a job has taskA and taskB, the parallelism of them are 3, and user uses the FORWARD partitioner
 ** Time1: the parallelism of taskA is changed to 2, we should do 2 things:
 *** replace FORWARD partitioner by REBALANCE partitioner in StreamTask
 *** changing the distribution type to ALL_TO_ALL in jobgraph.
 ** Time2: the parallelism of taskB is changed to 2, we should do 2 things:
 *** Using the FORWARD partitioner 
 *** Using the POINTWISE distribution type
 ** The case is fine.
 ** However, assume a job has taskA and taskB, the parallelism of them are 3, and user uses the *REBALANCE* partitioner.
 *** This case, user choose the REBALANCE partitioner even if it can use FORWARD partitioner here.
 *** For this case, we should still keep the REBALANCE partitioner and ALL_TO_ALL for time1 and time2.
 ** So time2 needs to consider should we update it to FORWARD partitioner and POINTWISE distribution type.

I'm not sure whether these 2 issues are clear.

Please let me know if I'm wrong, thanks~

> Wrong dynamic replacement of partitioner from FORWARD to REBLANCE for autoscaler and adaptive scheduler
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33123
>                 URL: https://issues.apache.org/jira/browse/FLINK-33123
>             Project: Flink
>          Issue Type: Bug
>          Components: Autoscaler, Runtime / Coordination
>    Affects Versions: 1.17.0, 1.18.0
>            Reporter: Zhanghao Chen
>            Priority: Critical
>         Attachments: image-2023-09-20-15-09-22-733.png, image-2023-09-20-15-14-04-679.png
>
>
> *Background*
> https://issues.apache.org/jira/browse/FLINK-30213 reported that the edge is wrong when the parallelism is changed for a vertex with a FORWARD edge, which is used by both the autoscaler and adaptive scheduler where one can change the vertex parallelism dynamically. Fix is applied to dynamically replace partitioner from FORWARD to REBLANCE on task deployment in {{{}StreamTask{}}}: 
>  
> !image-2023-09-20-15-09-22-733.png|width=560,height=221!
> *Problem*
> Unfortunately, the fix is still buggy in two aspects:
>  # The connections between upstream and downstream tasks are determined by the distribution type of the partitioner when generating execution graph on the JM side. When the edge is FORWARD, the distribution type is POINTWISE, and Flink will try to evenly distribute subpartitions to all downstream tasks. If one want to change it to REBALANCE, the distribution type has to be changed to ALL_TO_ALL to make all-to-all connections between upstream and downstream tasks. However, the fix did not change the distribution type which makes the network connections be set up in a wrong way.
>  # The FOWARD partitioner will be replaced if environment.getWriter(outputIndex).getNumberOfSubpartitions() equals to the task parallelism. However, the number of subpartitions here equals to the number of downstream tasks of this particular task, which is also determined by the distribution type of the partitioner when generating execution graph on the JM side.  When ceil(downstream task parallelism / upstream task parallelism) = upstream task parallelism, we will have the number of subpartitions = task parallelism. For example, for a topology A (parallelism 2) -> B (parallelism 5), we will have 1 A task having 2 subpartitions, 1 A task having 3 subpartition, and hence 1 task will have its number of subpartitions equals to the task parallelism 2 and skip partitioner replacement. As a result, that task will only send data to only one downstream task as the FORWARD partitioner always send data to the first subpartition. In fact, for a normal job with a FORWARD edge without any autoscaling action, you will find that the partitioner is changed to REBALANCE internally as the number of subpartitions always equals to 1 in this case.
> !image-2023-09-20-15-14-04-679.png|width=892,height=301!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)