You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/07 15:09:00 UTC

[GitHub] [flink] dmvk commented on pull request #18189: [FLINK-25430] Replace RunningJobRegistry by JobResultStore

dmvk commented on pull request #18189:
URL: https://github.com/apache/flink/pull/18189#issuecomment-1007482147


   It would be executed in the same thread that has called the complete method
   (which belongs to the pool). Passing the executor directly allows for
   picking another thread from the pool. Also the control mechanism it's kind
   of more explicit as you can never be sure that the "complete" method has
   been called by the thread from previous stage.
   
   On Fri, Jan 7, 2022 at 4:04 PM Matthias Pohl ***@***.***>
   wrote:
   
   > ***@***.**** commented on this pull request.
   > ------------------------------
   >
   > In
   > flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
   > <https://github.com/apache/flink/pull/18189#discussion_r780321496>:
   >
   > > +        return CompletableFuture.supplyAsync(
   > +                        this::getGloballyCompletedJobResultsIfRunning, ioExecutor)
   > +                .thenCompose(
   > +                        globallyTerminatedJobs ->
   > +                                CompletableFuture.supplyAsync(
   > +                                                () ->
   > +                                                        this.recoverJobsIfRunning(
   > +                                                                globallyTerminatedJobs.stream()
   > +                                                                        .map(JobResult::getJobId)
   > +                                                                        .collect(
   > +                                                                                Collectors
   > +                                                                                        .toSet())),
   > +                                                ioExecutor)
   > +                                        .thenAccept(
   > +                                                jobGraphs ->
   > +                                                        createDispatcherIfRunning(
   > +                                                                jobGraphs, globallyTerminatedJobs))
   > +                                        .handle(this::onErrorIfRunning));
   >
   > I'm wondering whether the dirtyJobsFuture.thenApplyAsync is necessary
   > here. Wouldn't it be executed in the same thread pool as the previous
   > supplyAsync since both are chained together?
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/flink/pull/18189#discussion_r780321496>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AACJGBL5ED6XK77BRIONM4DUU36GPANCNFSM5KUWUGAA>
   > .
   > Triage notifications on the go with GitHub Mobile for iOS
   > <https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
   > or Android
   > <https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>.
   >
   > You are receiving this because you commented.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org