You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 刘建刚 <li...@gmail.com> on 2021/03/24 04:06:43 UTC
Fail to cancel perJob for that deregisterApplication is not called
I am using flink 1.10.0. My perJob can not be cancelled. From the log
I find that webMonitorEndpoint.closeAsync() is completed but
deregisterApplication is not called. The related code is as follows:
public CompletableFuture<Void> deregisterApplicationAndClose(
final ApplicationStatus applicationStatus,
final @Nullable String diagnostics) {
if (isRunning.compareAndSet(true, false)) {
final CompletableFuture<Void> closeWebMonitorAndDeregisterAppFuture =
FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () ->
deregisterApplication(applicationStatus, diagnostics,
resourceManager.getJobId()));
return FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture,
this::closeAsyncInternal);
} else {
return terminationFuture;
}
}
For webMonitorEndpoint.closeAsync(), the code is as follows:
public CompletableFuture<Void> closeAsync() {
synchronized (lock) {
log.info("State is {}. Shutting down rest endpoint.", state);
if (state == State.RUNNING) {
final CompletableFuture<Void> shutDownFuture =
FutureUtils.composeAfterwards(
closeHandlersAsync(),
this::shutDownInternal);
shutDownFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
log.info("Shut down complete.");
if (throwable != null) {
terminationFuture.completeExceptionally(throwable);
} else {
terminationFuture.complete(null);
}
});
state = State.SHUTDOWN;
} else if (state == State.CREATED) {
terminationFuture.complete(null);
state = State.SHUTDOWN;
}
return terminationFuture;
}
}
I am sure that it is completed with the log I added as follows:
[image: image.png]
For deregisterApplication, I do not see any related log like "Shut
down cluster because application is in {}, diagnostics {}.".
Can anyone give me some suggestions? Thank you.
Re: Fail to cancel perJob for that deregisterApplication is not called
Posted by 刘建刚 <li...@gmail.com>.
Thank you for the answer.
I met the same problem again. I add log in RestServerEndpoint's closeAsync
method as following:
@Override
public CompletableFuture<Void> closeAsync() {
synchronized (lock) {
log.info("State is {}. Shutting down rest endpoint.", state);
if (state == State.RUNNING) {
final CompletableFuture<Void> shutDownFuture =
FutureUtils.composeAfterwards(
closeHandlersAsync(),
this::shutDownInternal);
shutDownFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
terminationFuture.completeExceptionally(throwable);
} else {
terminationFuture.complete(null);
}
log.info("Shut down complete with {}.", terminationFuture);
});
state = State.SHUTDOWN;
} else if (state == State.CREATED) {
terminationFuture.complete(null);
state = State.SHUTDOWN;
}
return terminationFuture;
}
}
After closeAsync, it is expected to
execute DispatcherResourceManagerComponent's deregisterApplication method
as following:
public CompletableFuture<Void> deregisterApplicationAndClose(
final ApplicationStatus applicationStatus,
final @Nullable String diagnostics) {
if (isRunning.compareAndSet(true, false)) {
final CompletableFuture<Void> closeWebMonitorAndDeregisterAppFuture =
FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () ->
deregisterApplication(applicationStatus, diagnostics,
resourceManager.getJobId()));
return FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture,
this::closeAsyncInternal);
} else {
return terminationFuture;
}
}
However, Resource's deregisterApplication method is not executed. I do not
know why. Any suggestions?
Chesnay Schepler [via Apache Flink User Mailing List archive.] <
ml+s2336050n42578h34@n4.nabble.com> 于2021年3月26日周五 下午6:54写道:
> Where exactly did you add your own log message?
>
> WebMonitorEndpoint.closeAsync() already logs on it's own whether the
> shutdown future was completed; meaning that it shouldn't have been
> necessary to add a separate log message.
> If you now only see the one you added, chances are that it was added at
> the wrong place.
>
> On 3/24/2021 5:06 AM, 刘建刚 wrote:
>
> I am using flink 1.10.0. My perJob can not be cancelled. From the
> log I find that webMonitorEndpoint.closeAsync() is completed but
> deregisterApplication is not called. The related code is as follows:
>
> public CompletableFuture<Void> deregisterApplicationAndClose(
> final ApplicationStatus applicationStatus, final @Nullable String diagnostics) {
>
> if (isRunning.compareAndSet(true, false)) {
> final CompletableFuture<Void> closeWebMonitorAndDeregisterAppFuture =
> FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () ->
> deregisterApplication(applicationStatus, diagnostics, resourceManager.getJobId())); return FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture, this::closeAsyncInternal); } else {
> return terminationFuture; }
> }
>
> For webMonitorEndpoint.closeAsync(), the code is as follows:
>
> public CompletableFuture<Void> closeAsync() {
> synchronized (lock) {
> log.info("State is {}. Shutting down rest endpoint.", state); if (state == State.RUNNING) {
> final CompletableFuture<Void> shutDownFuture = FutureUtils.composeAfterwards(
> closeHandlersAsync(), this::shutDownInternal); shutDownFuture.whenComplete(
> (Void ignored, Throwable throwable) -> {
> log.info("Shut down complete."); if (throwable != null) {
> terminationFuture.completeExceptionally(throwable); } else {
> terminationFuture.complete(null); }
> }); state = State.SHUTDOWN; } else if (state == State.CREATED) {
> terminationFuture.complete(null); state = State.SHUTDOWN; }
>
> return terminationFuture; }
> }
>
> I am sure that it is completed with the log I added as follows:
> [image: image.png]
>
> For deregisterApplication, I do not see any related log like "Shut
> down cluster because application is in {}, diagnostics {}.".
> Can anyone give me some suggestions? Thank you.
>
>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fail-to-cancel-perJob-for-that-deregisterApplication-is-not-called-tp42499p42578.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml+s2336050n1h47@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=bGl1amlhbmdhbmdwZW5nQGdtYWlsLmNvbXwxfC0xMTYwNzM3MjI=>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
Re: Fail to cancel perJob for that deregisterApplication is not
called
Posted by Chesnay Schepler <ch...@apache.org>.
Where exactly did you add your own log message?
WebMonitorEndpoint.closeAsync() already logs on it's own whether the
shutdown future was completed; meaning that it shouldn't have been
necessary to add a separate log message.
If you now only see the one you added, chances are that it was added at
the wrong place.
On 3/24/2021 5:06 AM, 刘建刚 wrote:
> I am using flink 1.10.0. My perJob can not be cancelled. From
> the log I find that webMonitorEndpoint.closeAsync() is completed but
> deregisterApplication is not called. The related code is as follows:
> public CompletableFuture<Void>deregisterApplicationAndClose(
> final ApplicationStatus applicationStatus, final @Nullable String diagnostics) {
>
> if (isRunning.compareAndSet(true, false)) {
> final CompletableFuture<Void> closeWebMonitorAndDeregisterAppFuture =
> FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () ->
> deregisterApplication(applicationStatus, diagnostics, resourceManager.getJobId())); return FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture, this::closeAsyncInternal); }else {
> return terminationFuture; }
> }
> For webMonitorEndpoint.closeAsync(), the code is as follows:
> public CompletableFuture<Void>closeAsync() {
> synchronized (lock) {
> log.info("State is {}. Shutting down rest endpoint.", state); if (state == State.RUNNING) {
> final CompletableFuture<Void> shutDownFuture = FutureUtils.composeAfterwards(
> closeHandlersAsync(), this::shutDownInternal); shutDownFuture.whenComplete(
> (Void ignored, Throwable throwable) -> {
> log.info("Shut down complete."); if (throwable !=null) {
> terminationFuture.completeExceptionally(throwable); }else {
> terminationFuture.complete(null); }
> }); state = State.SHUTDOWN; }else if (state == State.CREATED) {
> terminationFuture.complete(null); state = State.SHUTDOWN; }
>
> return terminationFuture; }
> }
> I am sure that it is completed with the log I added as follows:
> image.png
> For deregisterApplication, I do not see any related log like
> "Shut down cluster because application is in {}, diagnostics {}.".
> Can anyone give me some suggestions? Thank you.
>