You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Zhilong Hong (Jira)" <ji...@apache.org> on 2021/03/23 04:07:00 UTC

[jira] [Created] (FLINK-21920) Optimize DefaultScheduler#allocateSlots

Zhilong Hong created FLINK-21920:
------------------------------------

             Summary: Optimize DefaultScheduler#allocateSlots
                 Key: FLINK-21920
                 URL: https://issues.apache.org/jira/browse/FLINK-21920
             Project: Flink
          Issue Type: Sub-task
          Components: Runtime / Coordination
    Affects Versions: 1.13.0
            Reporter: Zhilong Hong
             Fix For: 1.13.0


Based on the scheduler benchmark introduced in FLINK-21731, we find that there are several procedures related to {{DefaultScheduler#allocateSlots}} have O(N^2) complexity. 

 

The first one is: {{ExecutionSlotSharingGroupBuilder#tryFindAvailableProducerExecutionSlotSharingGroupFor}}. The original implementation is:

 
{code:java}
for all SchedulingExecutionVertex in DefaultScheduler:
  for all consumed SchedulingResultPartition of the SchedulingExecutionVertex:
    get the result partition's producer vertex and determine the ExecutionSlotSharingGroup where the producer vertex locates is available for current vertex{code}
This procedure has O(N^2) complexity.

It's obvious that the result partitions in the same ConsumedPartitionGroup have the same producer vertex. So we can just iterate over the ConsumedPartitionGroups instead of all the consumed partitions. This will decrease the complexity from O(N^2) to O(N).

 

The second one is: {{ExecutionGraphToInputsLocationsRetrieverAdapter#getConsumedResultPartitionsProducers}}. The original implementation is:
{code:java}
for all SchedulingExecutionVertex in DefaultScheduler:
  for all ConsumedPartitionGroup of the SchedulingExecutionVertex:
    for all IntermediateResultPartition in the ConsumedPartitionGroup:
      get producer of the IntermediateResultPartition {code}
This procedure has O(N^2) complexity.

We can see that for each SchedulingExecutionVertex, the producers of its ConsumedPartitionGroup is calculated separately. For SchedulingExecutionVertices in the same ConsumerVertexGroup, they have the same ConsumedPartitionGroup. Thus, we don't need to calculate the producers over and over again. We can use a local cache to cache the producers. This will decrease the complexity from O(N^2) to O(N).

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)