You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/14 07:26:38 UTC

[GitHub] [flink] zhuzhurk opened a new pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

zhuzhurk opened a new pull request #13628:
URL: https://github.com/apache/flink/pull/13628


   ## What is the purpose of the change
   
   This PR fixes the issue reported in FLINK-19552, that JM can crash if concurrent failures happens.
   The root cause is that `MergingSharedSlotProfileRetrieverFactory` incorrectly retrieves preferred locations for vertices which are not scheduled yet, while some of their producer vertex are canceled and so are their location futures.
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
     - *UTs in SlotSharingExecutionSlotAllocatorTest and MergingSharedSlotProfileRetrieverTest*
     - *IT: PipelinedRegionSchedulingConcurrentFailureTest*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk edited a comment on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
zhuzhurk edited a comment on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-708543149


   > Thanks for creating this fix @zhuzhurk. If I understand the problem correctly, then I think the PR fixes the problem. However, I have a couple of related questions:
   > 
   > 1. `PipelinedRegionSchedulingConcurrentFailureTest. testNoImmediateSlotAllocationFailureOnConcurrentFailure` seems to be able to reproduce the problem because the `RestartPipelinedRegionFailoverStrategy` also cancels not yet started downstream consumers. Why is this the case? Shouldn't it only be necessary to cancel `Executions` which are not `CREATED`?
   > 2. `ExecutionGraphToInputsLocationsRetrieverAdapter` seems to solve the problem of deadlocking the `MergingSharedSlotProfileRetriever` by returning an input future of an execution which is about to be scheduled by checking on the `CREATED` state. However, it returns the input location future if the `Execution` has a different state. To me this seems an odd responsibility of an `InputLocationsRetriever`. I think it would make more sense to filter such dependencies out at a different level.
   > 3. Why does `MergingSharedSlotProfileRetriever.getSlotProfileFuture` returns a future? At the time of scheduling a pipelined region, shouldn't all its inputs and their locations be known?
   > 4. The fix limits the set of input locations to the specified bulk. However, the bulk does need to contain all `Executions` which can share the slot. Hence, by doing this filtering, we give up potential information about other input locations for tasks which can also run in the same slot. Shouldn't we try to obtain their input locations and only ignore them if we cannot get them?
   > 5. In `PipelinedRegionSchedulingConcurrentFailureTest`, it seems as if `Executions` of `v3` and `v4` are in the same `ExecutionSlotSharingGroup` as `Executions` of `v1` and `v2`. I think this is also the reason why these `ExecutionVertices` are actually asked for their input locations. Given that `v2` and `v3` is separated by a blocking data exchange, how can `Executions` of these vertices share a slot? Wouldn't it also mean that we are asking for a too large slot because `SlotSharingExecutionSlotAllocator.getPhysicalSlotResourceProfile` includes `v1, v2, v3, v4` in one slot?
   > 
   > If there is no strong need for waiting for a future input location, I would say that it would probably be easiest if the `MergingSharedSlotProfileRetriever` checks all `Executions` of its `ExecutionSlotSharingGroup` and collects the available input locations. Based on this information it can then make an approximate decision. That way, it should also ignore currently failed tasks. Moreover, we should check how the `ExecutionSlotSharingGroups` are calculated. There might be small inaccuracy in it.
   > 
   > cc @azagrebin
   
   Thanks for the thoughtful comments. @tillrohrmann 
   I agree that we can simply `MergingSharedSlotProfileRetriever` to directly return `SlotProfile` instead of a future by just collecting available input locations. This should be enough for pipelined region scheduling which schedules a region only if all its inputs regions have finished. 
   Regarding the inaccuracy of `ExecutionSlotSharingGroups`, does `#5` or the other comment answers your question?
   
   Below are replies to each listed question:
   1. This is unwanted side effect of restarting all downstream tasks of a failed task to avoid indeterministic issues. I agree we should skip canceling tasks in CREATED. This can reduce a lot of unnecessary canceling logs, especially for large scale batch jobs.
   2. Agreed it's odd. But given that EAGER/LAZY scheduling which will be kept for a while, and location future is still needed for them, maybe we can leave it as is. While for pipelined region scheduling, we can introduce `PreferredLocationsRetrieverV2` and `InputsLocationsRetrieverV2` which directly returns locations instead of futures? Only available locations will be considered in this case.
   3. If considering the whole `ExecutionSlotSharingGroup`, some vertices may have not been scheduled so their inputs may not be decided. However, we can change it to just consider the available locations at that time, like mentioned in `#2`
   4. Yes you are right. It would be better to take those un-scheduled vertices into consideration when calculating preferred input locations.
   5. The case mocks a `DataSet` job. Currently, job vertices of a `DataSet` job will be put into the same `SlotSharingGroup`. I think we need to change it to assign each logical pipelined region a different slot sharing group, similarly like how blink planner batch jobs do. This is something we have missed after making pipelined region scheduling for all kinds of jobs.
   
   In summary, here are the actions to take:
   1. Fix FLINK-19552 in a different way. i.e. introducing `PreferredLocationsRetrieverV2` and `InputsLocationsRetrieverV2` which directly returns locations by only available ones
   2. Change default `SlotSharingGroup` setting for DataSet jobs by assigning each logical pipelined region a different slot sharing group
   3. Change region failover to skip restarting CREATED tasks
   
   WDYT?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann edited a comment on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
tillrohrmann edited a comment on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-708377123


   A somewhat related question concerning the `ExecutionSlotSharingGroup` is the following: How will the `ExecutionSlotSharingGroup` be calculated if we have the following JobGraph:
   
   ```
   v1 --> v2
   v3 --> v4
   ```
   
   here `v1, v2, v3, v4` are `JobVertices` and `-->` is a blocking data exchange. Given this specification we can say that neither `v1` and `v2` can share slots because of the blocking data exchange. The same applies for `v3` and `v4`. However, `v3` or `v4` could share the slot with `v1`. How is this solved? Will the effective execution slot sharing group for `v1` contain `v3` and `v4` or will we say `v1` can share with `v3` but not `v4`?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-708509757


   > A somewhat related question concerning the `ExecutionSlotSharingGroup` is the following: How will the `ExecutionSlotSharingGroup` be calculated if we have the following JobGraph:
   > 
   > ```
   > v1 --> v2
   > v3 --> v4
   > ```
   > 
   > here `v1, v2, v3, v4` are `JobVertices` and `-->` is a blocking data exchange. Given this specification we can say that neither `v1` and `v2` can share slots because of the blocking data exchange. The same applies for `v3` and `v4`. However, `v3` or `v4` could share the slot with `v1`. How is this solved? Will the effective execution slot sharing group for `v1` contain `v3` and `v4` or will we say `v1` can share with `v3` but not `v4`?
   
   `ExecutionSlotSharingGroup` relies on how we set the `SlotSharingGroup` to `JobVertex`. This means, if v1, v2, v3, v4 are in the same `SlotSharingGroup`, then we can have an `ExecutionSlotSharingGroup` {ev11, ev21, ev31, ev41}. So I think your question here is about how we set `SlotSharingGroup` of job vertices.
   The ways to set default `SlotSharingGroup` for job vertices are currently different for streaming jobs(including DataStream and Table/SQL streaming), blink planner batch jobs and DataSet jobs. 
    - For streaming jobs, all job vertices are by default in the same "default" `SlotSharingGroup`. 
    - For blink planner jobs, each logical pipelined region corresponds to a `SlotSharingGroup`.
    - For DataSet jobs, all job vertices are by default in the same "default" `SlotSharingGroup`. This is not ideal and I think should be changed to be aligned with blink planner jobs.
   
   Regarding whether we should make v1 and v3 in the same `SlotSharingGroup`, I think it's not very necessary while it can complicate things. I feel it is not necessary because their slot sharing will not improve input locality. And unlike streaming jobs, it does not simplify the resource reasoning.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-708543149


   > Thanks for creating this fix @zhuzhurk. If I understand the problem correctly, then I think the PR fixes the problem. However, I have a couple of related questions:
   > 
   > 1. `PipelinedRegionSchedulingConcurrentFailureTest. testNoImmediateSlotAllocationFailureOnConcurrentFailure` seems to be able to reproduce the problem because the `RestartPipelinedRegionFailoverStrategy` also cancels not yet started downstream consumers. Why is this the case? Shouldn't it only be necessary to cancel `Executions` which are not `CREATED`?
   > 2. `ExecutionGraphToInputsLocationsRetrieverAdapter` seems to solve the problem of deadlocking the `MergingSharedSlotProfileRetriever` by returning an input future of an execution which is about to be scheduled by checking on the `CREATED` state. However, it returns the input location future if the `Execution` has a different state. To me this seems an odd responsibility of an `InputLocationsRetriever`. I think it would make more sense to filter such dependencies out at a different level.
   > 3. Why does `MergingSharedSlotProfileRetriever.getSlotProfileFuture` returns a future? At the time of scheduling a pipelined region, shouldn't all its inputs and their locations be known?
   > 4. The fix limits the set of input locations to the specified bulk. However, the bulk does need to contain all `Executions` which can share the slot. Hence, by doing this filtering, we give up potential information about other input locations for tasks which can also run in the same slot. Shouldn't we try to obtain their input locations and only ignore them if we cannot get them?
   > 5. In `PipelinedRegionSchedulingConcurrentFailureTest`, it seems as if `Executions` of `v3` and `v4` are in the same `ExecutionSlotSharingGroup` as `Executions` of `v1` and `v2`. I think this is also the reason why these `ExecutionVertices` are actually asked for their input locations. Given that `v2` and `v3` is separated by a blocking data exchange, how can `Executions` of these vertices share a slot? Wouldn't it also mean that we are asking for a too large slot because `SlotSharingExecutionSlotAllocator.getPhysicalSlotResourceProfile` includes `v1, v2, v3, v4` in one slot?
   > 
   > If there is no strong need for waiting for a future input location, I would say that it would probably be easiest if the `MergingSharedSlotProfileRetriever` checks all `Executions` of its `ExecutionSlotSharingGroup` and collects the available input locations. Based on this information it can then make an approximate decision. That way, it should also ignore currently failed tasks. Moreover, we should check how the `ExecutionSlotSharingGroups` are calculated. There might be small inaccuracy in it.
   > 
   > cc @azagrebin
   
   Thanks for the thoughtful comments. @tillrohrmann 
   I agree that we can simply `MergingSharedSlotProfileRetriever` to directly return `SlotProfile` instead of a future by just collecting available input locations. This should be enough for pipelined region scheduling which schedules a region only if all its inputs regions have finished. 
   Regarding the inaccuracy of `ExecutionSlotSharingGroups`, does `#5` or the other comment answers your question?
   
   Below are replies to each listed question:
   1. This is unwanted side effect of restarting all downstream tasks of a failed task to avoid indeterministic issues. I agree we should skip canceling tasks in CREATED. This can reduce a lot of unnecessary canceling logs, especially for large scale batch jobs.
   2. Agreed it's odd. But given that EAGER/LAZY scheduling which will be kept for a while, and location future is still needed for them, maybe we can leave it as is. While for pipelined region scheduling, we can introduce `PreferredLocationsRetrieverV2` and `InputsLocationsRetrieverV2` which directly returns locations instead of futures? Only available locations will be considered in this case.
   3. If considering the whole `ExecutionSlotSharingGroup`, some vertices may have not been scheduled so their inputs may not be decided. However, we can change it to just consider the available locations at that time, like mentioned in `#2`
   4. Yes you are right. It would be better to take those un-scheduled vertices into consideration when calculating preferred input locations.
   5. The case mocks a `DataSet` job. Currently, job vertices of a `DataSet` job will be put into the same `SlotSharingGroup`. I think we need to change it to assign each logical pipelined region a different slot sharing group, similarly like how blink planner batch jobs do. This is something we have missed after making pipelined region scheduling for all kinds of jobs.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-708243910


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4150a04838dfeb128adaa3fb195805ff01bdaae6",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7588",
       "triggerID" : "4150a04838dfeb128adaa3fb195805ff01bdaae6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4150a04838dfeb128adaa3fb195805ff01bdaae6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7588) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-708243910


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4150a04838dfeb128adaa3fb195805ff01bdaae6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7588",
       "triggerID" : "4150a04838dfeb128adaa3fb195805ff01bdaae6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4150a04838dfeb128adaa3fb195805ff01bdaae6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7588) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk edited a comment on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
zhuzhurk edited a comment on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-708543149


   > Thanks for creating this fix @zhuzhurk. If I understand the problem correctly, then I think the PR fixes the problem. However, I have a couple of related questions:
   > 
   > 1. `PipelinedRegionSchedulingConcurrentFailureTest. testNoImmediateSlotAllocationFailureOnConcurrentFailure` seems to be able to reproduce the problem because the `RestartPipelinedRegionFailoverStrategy` also cancels not yet started downstream consumers. Why is this the case? Shouldn't it only be necessary to cancel `Executions` which are not `CREATED`?
   > 2. `ExecutionGraphToInputsLocationsRetrieverAdapter` seems to solve the problem of deadlocking the `MergingSharedSlotProfileRetriever` by returning an input future of an execution which is about to be scheduled by checking on the `CREATED` state. However, it returns the input location future if the `Execution` has a different state. To me this seems an odd responsibility of an `InputLocationsRetriever`. I think it would make more sense to filter such dependencies out at a different level.
   > 3. Why does `MergingSharedSlotProfileRetriever.getSlotProfileFuture` returns a future? At the time of scheduling a pipelined region, shouldn't all its inputs and their locations be known?
   > 4. The fix limits the set of input locations to the specified bulk. However, the bulk does need to contain all `Executions` which can share the slot. Hence, by doing this filtering, we give up potential information about other input locations for tasks which can also run in the same slot. Shouldn't we try to obtain their input locations and only ignore them if we cannot get them?
   > 5. In `PipelinedRegionSchedulingConcurrentFailureTest`, it seems as if `Executions` of `v3` and `v4` are in the same `ExecutionSlotSharingGroup` as `Executions` of `v1` and `v2`. I think this is also the reason why these `ExecutionVertices` are actually asked for their input locations. Given that `v2` and `v3` is separated by a blocking data exchange, how can `Executions` of these vertices share a slot? Wouldn't it also mean that we are asking for a too large slot because `SlotSharingExecutionSlotAllocator.getPhysicalSlotResourceProfile` includes `v1, v2, v3, v4` in one slot?
   > 
   > If there is no strong need for waiting for a future input location, I would say that it would probably be easiest if the `MergingSharedSlotProfileRetriever` checks all `Executions` of its `ExecutionSlotSharingGroup` and collects the available input locations. Based on this information it can then make an approximate decision. That way, it should also ignore currently failed tasks. Moreover, we should check how the `ExecutionSlotSharingGroups` are calculated. There might be small inaccuracy in it.
   > 
   > cc @azagrebin
   
   Thanks for the thoughtful comments. @tillrohrmann 
   I agree that we can simply `MergingSharedSlotProfileRetriever` to directly return `SlotProfile` instead of a future by just collecting available input locations. This should be enough for pipelined region scheduling which schedules a region only if all its inputs regions have finished. 
   Regarding the inaccuracy of `ExecutionSlotSharingGroups`, does `#5` or the other comment answers your question?
   
   Below are replies to each listed question:
   1. This is unwanted side effect of restarting all downstream tasks of a failed task to avoid indeterministic issues. I agree we should skip canceling tasks in CREATED. This can reduce a lot of unnecessary canceling logs, especially for large scale batch jobs.
   2. Agreed it's odd. But given that EAGER/LAZY scheduling which will be kept for a while, and location future is still needed for them, maybe we can leave it as is. While for pipelined region scheduling, we can introduce `PreferredLocationsRetrieverV2` and `InputsLocationsRetrieverV2` which directly returns locations instead of futures? Only available locations will be considered in this case.
   3. If considering the whole `ExecutionSlotSharingGroup`, some vertices may have not been scheduled so their inputs may not be decided. However, we can change it to just consider the available locations at that time, like mentioned in `#2`
   4. Yes you are right. It would be better to take those un-scheduled vertices into consideration when calculating preferred input locations.
   5. The case mocks a `DataSet` job. Currently, job vertices of a `DataSet` job will be put into the same `SlotSharingGroup`. I think we need to change it to assign each logical pipelined region a different slot sharing group, similarly like how blink planner batch jobs do. This is something we have missed after making pipelined region scheduling for all kinds of jobs.
   
   WDYT?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-708216550


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 6187d12a0e548c97ff4129020fd5bd93d02923a9 (Wed Oct 14 07:28:15 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin commented on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
azagrebin commented on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-709376678


   I generally agree with the proposed steps. We should try to take into account locations of all available producers for a given `SlotSharingGroup`, hence futures are not needed in case of pipelined region scheduling.
   
   I have a note about `SlotSharingGroup` assignment for batch scheduling:
   
   > each logical pipelined region has a different `SlotSharingGroup`
   
   Although, this assignment simplifies things,
   given @tillrohrmann 's example for the parallelism of one (as I understand `v1 --> v2` and `v3 --> v4` are basically disjoint pipelines):
   
   > ```
   > v1 --> v2
   > v3 --> v4
   > ```
   > 
   > here `v1, v2, v3, v4` are `JobVertices` and `-->` is a blocking data exchange
   
   v11 and v31 executions cannot run at the same time if there is only one slot.
   Not sure, how important it is for performance, probably depends on the use case.
   On the other hand, deciding about such fine-grained slot sharing for some bigger example may be a non-trivial task and up to the user to decide using API.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-711915100


   @zhuzhurk and @azagrebin have you already created the follow up issues?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin commented on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
azagrebin commented on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-721119999


   Closing in favour of #13730.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin commented on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
azagrebin commented on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-709881257


   > 2. v1 and v3 do not have input dependency, so putting them into the same slot does not reduce data exchange cost, compared to tasks in the same pipelined region
   
   I meant execution speed up by running V1 and v3 at the same time in the same slot (makes sense w/o dynamic slot allocation as it is mostly the case now).
   
   > 3. If we can have one slot which runs v11 and v31 together, it indicates that the resources are actually enough to be split into 2 slots, one for v11 and one for v31.
   
   This is indeed not an issue when we have dynamic slot allocation.
   
   > So I think assigning each logical pipelined region to a different SlotSharingGroup is enough and simpler.
   > Moreover, as you mentioned, this is just the default behavior and we can enable users to set `SlotSharingGroup` for tasks via API if there were strong requirements for that.
   
   Agree, it is generally hard for Flink to decide which regions can run together in batch.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-708954170


   Hi @zhuzhurk, thanks for answering my questions so swiftly. I agree with your proposal how to fix the problem with the 3 steps.
   
   >1. Fix FLINK-19552 in a different way. i.e. introducing PreferredLocationsRetrieverV2 and InputsLocationsRetrieverV2 which directly returns locations by only available ones
   >2. Change default SlotSharingGroup setting for DataSet jobs by assigning each logical pipelined region a different slot sharing group
   >3. Change region failover to skip restarting CREATED tasks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] azagrebin closed pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
azagrebin closed pull request #13628:
URL: https://github.com/apache/flink/pull/13628


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-709856832


   @azagrebin I feel that it is not very necessary to put v1 and v3 in the same SlotSharingGroup. The reasons are:
   1. For streaming jobs, put all vertices in the same slot sharing group make it easier for users to decide the job parallelism with regard to the cluster resources. But for batch jobs, it is not needed.
   2. v1 and v3 do not have input dependency, so putting them into the same slot does not reduce data exchange cost, compared to tasks in the same pipelined region
   3. If we can have one slot which runs v11 and v31 together, it indicates that the resources are actually enough to be split into 2 slots, one for v11 and one for v31.
   
   So I think assigning each logical pipelined region to a different SlotSharingGroup is enough and simpler. 
   Moreover, as you mentioned, this is just the default behavior and we can enable users to set `SlotSharingGroup` for tasks via API if there were strong requirements for that.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-708243910


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4150a04838dfeb128adaa3fb195805ff01bdaae6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4150a04838dfeb128adaa3fb195805ff01bdaae6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4150a04838dfeb128adaa3fb195805ff01bdaae6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk edited a comment on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
zhuzhurk edited a comment on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-708543149


   > Thanks for creating this fix @zhuzhurk. If I understand the problem correctly, then I think the PR fixes the problem. However, I have a couple of related questions:
   > 
   > 1. `PipelinedRegionSchedulingConcurrentFailureTest. testNoImmediateSlotAllocationFailureOnConcurrentFailure` seems to be able to reproduce the problem because the `RestartPipelinedRegionFailoverStrategy` also cancels not yet started downstream consumers. Why is this the case? Shouldn't it only be necessary to cancel `Executions` which are not `CREATED`?
   > 2. `ExecutionGraphToInputsLocationsRetrieverAdapter` seems to solve the problem of deadlocking the `MergingSharedSlotProfileRetriever` by returning an input future of an execution which is about to be scheduled by checking on the `CREATED` state. However, it returns the input location future if the `Execution` has a different state. To me this seems an odd responsibility of an `InputLocationsRetriever`. I think it would make more sense to filter such dependencies out at a different level.
   > 3. Why does `MergingSharedSlotProfileRetriever.getSlotProfileFuture` returns a future? At the time of scheduling a pipelined region, shouldn't all its inputs and their locations be known?
   > 4. The fix limits the set of input locations to the specified bulk. However, the bulk does need to contain all `Executions` which can share the slot. Hence, by doing this filtering, we give up potential information about other input locations for tasks which can also run in the same slot. Shouldn't we try to obtain their input locations and only ignore them if we cannot get them?
   > 5. In `PipelinedRegionSchedulingConcurrentFailureTest`, it seems as if `Executions` of `v3` and `v4` are in the same `ExecutionSlotSharingGroup` as `Executions` of `v1` and `v2`. I think this is also the reason why these `ExecutionVertices` are actually asked for their input locations. Given that `v2` and `v3` is separated by a blocking data exchange, how can `Executions` of these vertices share a slot? Wouldn't it also mean that we are asking for a too large slot because `SlotSharingExecutionSlotAllocator.getPhysicalSlotResourceProfile` includes `v1, v2, v3, v4` in one slot?
   > 
   > If there is no strong need for waiting for a future input location, I would say that it would probably be easiest if the `MergingSharedSlotProfileRetriever` checks all `Executions` of its `ExecutionSlotSharingGroup` and collects the available input locations. Based on this information it can then make an approximate decision. That way, it should also ignore currently failed tasks. Moreover, we should check how the `ExecutionSlotSharingGroups` are calculated. There might be small inaccuracy in it.
   > 
   > cc @azagrebin
   
   Thanks for the thoughtful comments. @tillrohrmann 
   I agree that we can simply `MergingSharedSlotProfileRetriever` to directly return `SlotProfile` instead of a future by just collecting available input locations. This should be enough for pipelined region scheduling which schedules a region only if all its inputs regions have finished. 
   Regarding the inaccuracy of `ExecutionSlotSharingGroups`, does `#5` or the other comment answers your question?
   
   Below are replies to each listed question:
   1. This is unwanted side effect of restarting all downstream tasks of a failed task to avoid indeterministic issues. I agree we should skip canceling tasks in CREATED. This can reduce a lot of unnecessary canceling logs, especially for large scale batch jobs.
   2. Agreed it's odd. But given that EAGER/LAZY scheduling which will be kept for a while, and location future is still needed for them, maybe we can leave it as is. While for pipelined region scheduling, we can introduce `PreferredLocationsRetrieverV2` and `InputsLocationsRetrieverV2` which directly returns locations instead of futures? Only available locations will be considered in this case.
   3. If considering the whole `ExecutionSlotSharingGroup`, some vertices may have not been scheduled so their inputs may not be decided. However, we can change it to just consider the available locations at that time, like mentioned in `#2`
   4. Yes you are right. It would be better to take those un-scheduled vertices into consideration when calculating preferred input locations.
   5. The case mocks a `DataSet` job. Currently, job vertices of a `DataSet` job will be put into the same `SlotSharingGroup`. I think we need to change it to assign each logical pipelined region a different slot sharing group, similarly like how blink planner batch jobs do. This is something we have missed after making pipelined region scheduling for all kinds of jobs.
   
   In summary, here are the actions to take:
   1. Fix FLINK-19552 in a different way. i.e. introduce `PreferredLocationsRetrieverV2` and `InputsLocationsRetrieverV2` which directly returns locations by only available ones
   2. change default `SlotSharingGroup` for DataSet jobs
   3. Region failover skip restarting CREATED tasks
   
   WDYT?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-708377123


   A somewhat related question concerning the `ExecutionSlotSharingGroup` is the following: How will the `ExecutionSlotSharingGroup` be calculated if we have the following JobGraph:
   
   ```
   v1 --> v2
   v3 --> v4
   ```
   
   here `v1, v2, v3, v4` are `JobVertices` and `-->` is a blocking data exchange. Given this specification we can say that neither `v1` and `v2` can share slots because of the blocking data exchange. The same applies for `v3` and `v4`. However, `v3` or `v4` could share the slot with `v1`. How is this solved? Will the effective execution slot sharing group for `v1` contain `v3` and `v4`?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zhuzhurk commented on pull request #13628: [FLINK-19552][runtime] Get preferred locations of an ExecutionSlotSharingGroup only for its executions from the scheduled bulk

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on pull request #13628:
URL: https://github.com/apache/flink/pull/13628#issuecomment-712128555


   @tillrohrmann I just opened FLINK-19712 and FLINK-19714 to track them.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org