You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 刘建刚 <li...@gmail.com> on 2021/03/24 04:06:43 UTC

Fail to cancel perJob for that deregisterApplication is not called

      I am using flink 1.10.0. My perJob can not be cancelled. From the log
I find that  webMonitorEndpoint.closeAsync() is completed but
deregisterApplication is not called. The related code is as follows:

public CompletableFuture<Void> deregisterApplicationAndClose(
      final ApplicationStatus applicationStatus,
      final @Nullable String diagnostics) {

   if (isRunning.compareAndSet(true, false)) {
      final CompletableFuture<Void> closeWebMonitorAndDeregisterAppFuture =
         FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () ->
            deregisterApplication(applicationStatus, diagnostics,
resourceManager.getJobId()));

      return FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture,
this::closeAsyncInternal);
   } else {
      return terminationFuture;
   }
}

      For webMonitorEndpoint.closeAsync(), the code is as follows:

public CompletableFuture<Void> closeAsync() {
   synchronized (lock) {
      log.info("State is {}. Shutting down rest endpoint.", state);

      if (state == State.RUNNING) {
         final CompletableFuture<Void> shutDownFuture =
FutureUtils.composeAfterwards(
            closeHandlersAsync(),
            this::shutDownInternal);

         shutDownFuture.whenComplete(
            (Void ignored, Throwable throwable) -> {
               log.info("Shut down complete.");
               if (throwable != null) {
                  terminationFuture.completeExceptionally(throwable);
               } else {
                  terminationFuture.complete(null);
               }
            });
         state = State.SHUTDOWN;
      } else if (state == State.CREATED) {
         terminationFuture.complete(null);
         state = State.SHUTDOWN;
      }

      return terminationFuture;
   }
}

      I am sure that it is completed with the log I added as follows:
[image: image.png]

     For deregisterApplication, I do not see any related log like "Shut
down cluster because application is in {}, diagnostics {}.".
      Can anyone give me some suggestions? Thank you.

Re: Fail to cancel perJob for that deregisterApplication is not called

Posted by 刘建刚 <li...@gmail.com>.
Thank you for the answer.

I met the same problem again. I add log in RestServerEndpoint's closeAsync
method as following:

@Override
public CompletableFuture<Void> closeAsync() {
   synchronized (lock) {
      log.info("State is {}. Shutting down rest endpoint.", state);

      if (state == State.RUNNING) {
         final CompletableFuture<Void> shutDownFuture =
FutureUtils.composeAfterwards(
            closeHandlersAsync(),
            this::shutDownInternal);

         shutDownFuture.whenComplete(
            (Void ignored, Throwable throwable) -> {
               if (throwable != null) {
                  terminationFuture.completeExceptionally(throwable);
               } else {
                  terminationFuture.complete(null);
               }
               log.info("Shut down complete with {}.", terminationFuture);
            });
         state = State.SHUTDOWN;
      } else if (state == State.CREATED) {
         terminationFuture.complete(null);
         state = State.SHUTDOWN;
      }

      return terminationFuture;
   }
}

After closeAsync, it is expected to
execute DispatcherResourceManagerComponent's deregisterApplication method
as following:

public CompletableFuture<Void> deregisterApplicationAndClose(
      final ApplicationStatus applicationStatus,
      final @Nullable String diagnostics) {

   if (isRunning.compareAndSet(true, false)) {
      final CompletableFuture<Void> closeWebMonitorAndDeregisterAppFuture =
         FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () ->
            deregisterApplication(applicationStatus, diagnostics,
resourceManager.getJobId()));

      return FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture,
this::closeAsyncInternal);
   } else {
      return terminationFuture;
   }
}

However, Resource's deregisterApplication method is not executed. I do not
know why. Any suggestions?




Chesnay Schepler [via Apache Flink User Mailing List archive.] <
ml+s2336050n42578h34@n4.nabble.com> 于2021年3月26日周五 下午6:54写道:

> Where exactly did you add your own log message?
>
> WebMonitorEndpoint.closeAsync() already logs on it's own whether the
> shutdown future was completed; meaning that it shouldn't have been
> necessary to add a separate log message.
> If you now only see the one you added, chances are that it was added at
> the wrong place.
>
> On 3/24/2021 5:06 AM, 刘建刚 wrote:
>
>       I am using flink 1.10.0. My perJob can not be cancelled. From the
> log I find that  webMonitorEndpoint.closeAsync() is completed but
> deregisterApplication is not called. The related code is as follows:
>
> public CompletableFuture<Void> deregisterApplicationAndClose(
>       final ApplicationStatus applicationStatus,      final @Nullable String diagnostics) {
>
>    if (isRunning.compareAndSet(true, false)) {
>       final CompletableFuture<Void> closeWebMonitorAndDeregisterAppFuture =
>          FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () ->
>             deregisterApplication(applicationStatus, diagnostics, resourceManager.getJobId()));      return FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture, this::closeAsyncInternal);   } else {
>       return terminationFuture;   }
> }
>
>       For webMonitorEndpoint.closeAsync(), the code is as follows:
>
> public CompletableFuture<Void> closeAsync() {
>    synchronized (lock) {
>       log.info("State is {}. Shutting down rest endpoint.", state);      if (state == State.RUNNING) {
>          final CompletableFuture<Void> shutDownFuture = FutureUtils.composeAfterwards(
>             closeHandlersAsync(),            this::shutDownInternal);         shutDownFuture.whenComplete(
>             (Void ignored, Throwable throwable) -> {
>                log.info("Shut down complete.");               if (throwable != null) {
>                   terminationFuture.completeExceptionally(throwable);               } else {
>                   terminationFuture.complete(null);               }
>             });         state = State.SHUTDOWN;      } else if (state == State.CREATED) {
>          terminationFuture.complete(null);         state = State.SHUTDOWN;      }
>
>       return terminationFuture;   }
> }
>
>       I am sure that it is completed with the log I added as follows:
> [image: image.png]
>
>      For deregisterApplication, I do not see any related log like "Shut
> down cluster because application is in {}, diagnostics {}.".
>       Can anyone give me some suggestions? Thank you.
>
>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fail-to-cancel-perJob-for-that-deregisterApplication-is-not-called-tp42499p42578.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml+s2336050n1h47@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=bGl1amlhbmdhbmdwZW5nQGdtYWlsLmNvbXwxfC0xMTYwNzM3MjI=>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>

Re: Fail to cancel perJob for that deregisterApplication is not called

Posted by Chesnay Schepler <ch...@apache.org>.
Where exactly did you add your own log message?

WebMonitorEndpoint.closeAsync() already logs on it's own whether the 
shutdown future was completed; meaning that it shouldn't have been 
necessary to add a separate log message.
If you now only see the one you added, chances are that it was added at 
the wrong place.

On 3/24/2021 5:06 AM, 刘建刚 wrote:
>       I am using flink 1.10.0. My perJob can not be cancelled. From 
> the log I find that webMonitorEndpoint.closeAsync() is completed but 
> deregisterApplication is not called. The related code is as follows:
> public CompletableFuture<Void>deregisterApplicationAndClose(
>        final ApplicationStatus applicationStatus, final @Nullable String diagnostics) {
>
>     if (isRunning.compareAndSet(true, false)) {
>        final CompletableFuture<Void> closeWebMonitorAndDeregisterAppFuture =
>           FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () ->
>              deregisterApplication(applicationStatus, diagnostics, resourceManager.getJobId())); return FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture, this::closeAsyncInternal); }else {
>        return terminationFuture; }
> }
>       For webMonitorEndpoint.closeAsync(), the code is as follows:
> public CompletableFuture<Void>closeAsync() {
>     synchronized (lock) {
>        log.info("State is {}. Shutting down rest endpoint.", state); if (state == State.RUNNING) {
>           final CompletableFuture<Void> shutDownFuture = FutureUtils.composeAfterwards(
>              closeHandlersAsync(), this::shutDownInternal); shutDownFuture.whenComplete(
>              (Void ignored, Throwable throwable) -> {
>                 log.info("Shut down complete."); if (throwable !=null) {
>                    terminationFuture.completeExceptionally(throwable); }else {
>                    terminationFuture.complete(null); }
>              }); state = State.SHUTDOWN; }else if (state == State.CREATED) {
>           terminationFuture.complete(null); state = State.SHUTDOWN; }
>
>        return terminationFuture; }
> }
>       I am sure that it is completed with the log I added as follows:
> image.png
>      For deregisterApplication, I do not see any related log like 
> "Shut down cluster because application is in {}, diagnostics {}.".
>       Can anyone give me some suggestions? Thank you.
>