You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/15 06:53:10 UTC

[GitHub] [flink] zhuzhurk edited a comment on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

zhuzhurk edited a comment on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-708543149


   > Thanks for creating this fix @zhuzhurk. If I understand the problem correctly, then I think the PR fixes the problem. However, I have a couple of related questions:
   > 
   > 1. `PipelinedRegionSchedulingConcurrentFailureTest. testNoImmediateSlotAllocationFailureOnConcurrentFailure` seems to be able to reproduce the problem because the `RestartPipelinedRegionFailoverStrategy` also cancels not yet started downstream consumers. Why is this the case? Shouldn't it only be necessary to cancel `Executions` which are not `CREATED`?
   > 2. `ExecutionGraphToInputsLocationsRetrieverAdapter` seems to solve the problem of deadlocking the `MergingSharedSlotProfileRetriever` by returning an input future of an execution which is about to be scheduled by checking on the `CREATED` state. However, it returns the input location future if the `Execution` has a different state. To me this seems an odd responsibility of an `InputLocationsRetriever`. I think it would make more sense to filter such dependencies out at a different level.
   > 3. Why does `MergingSharedSlotProfileRetriever.getSlotProfileFuture` returns a future? At the time of scheduling a pipelined region, shouldn't all its inputs and their locations be known?
   > 4. The fix limits the set of input locations to the specified bulk. However, the bulk does need to contain all `Executions` which can share the slot. Hence, by doing this filtering, we give up potential information about other input locations for tasks which can also run in the same slot. Shouldn't we try to obtain their input locations and only ignore them if we cannot get them?
   > 5. In `PipelinedRegionSchedulingConcurrentFailureTest`, it seems as if `Executions` of `v3` and `v4` are in the same `ExecutionSlotSharingGroup` as `Executions` of `v1` and `v2`. I think this is also the reason why these `ExecutionVertices` are actually asked for their input locations. Given that `v2` and `v3` is separated by a blocking data exchange, how can `Executions` of these vertices share a slot? Wouldn't it also mean that we are asking for a too large slot because `SlotSharingExecutionSlotAllocator.getPhysicalSlotResourceProfile` includes `v1, v2, v3, v4` in one slot?
   > 
   > If there is no strong need for waiting for a future input location, I would say that it would probably be easiest if the `MergingSharedSlotProfileRetriever` checks all `Executions` of its `ExecutionSlotSharingGroup` and collects the available input locations. Based on this information it can then make an approximate decision. That way, it should also ignore currently failed tasks. Moreover, we should check how the `ExecutionSlotSharingGroups` are calculated. There might be small inaccuracy in it.
   > 
   > cc @azagrebin
   
   Thanks for the thoughtful comments. @tillrohrmann 
   I agree that we can simply `MergingSharedSlotProfileRetriever` to directly return `SlotProfile` instead of a future by just collecting available input locations. This should be enough for pipelined region scheduling which schedules a region only if all its inputs regions have finished. 
   Regarding the inaccuracy of `ExecutionSlotSharingGroups`, does `#5` or the other comment answers your question?
   
   Below are replies to each listed question:
   1. This is unwanted side effect of restarting all downstream tasks of a failed task to avoid indeterministic issues. I agree we should skip canceling tasks in CREATED. This can reduce a lot of unnecessary canceling logs, especially for large scale batch jobs.
   2. Agreed it's odd. But given that EAGER/LAZY scheduling which will be kept for a while, and location future is still needed for them, maybe we can leave it as is. While for pipelined region scheduling, we can introduce `PreferredLocationsRetrieverV2` and `InputsLocationsRetrieverV2` which directly returns locations instead of futures? Only available locations will be considered in this case.
   3. If considering the whole `ExecutionSlotSharingGroup`, some vertices may have not been scheduled so their inputs may not be decided. However, we can change it to just consider the available locations at that time, like mentioned in `#2`
   4. Yes you are right. It would be better to take those un-scheduled vertices into consideration when calculating preferred input locations.
   5. The case mocks a `DataSet` job. Currently, job vertices of a `DataSet` job will be put into the same `SlotSharingGroup`. I think we need to change it to assign each logical pipelined region a different slot sharing group, similarly like how blink planner batch jobs do. This is something we have missed after making pipelined region scheduling for all kinds of jobs.
   
   WDYT?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org