You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "DONG, Weike" <ky...@connect.hku.hk> on 2020/03/13 04:30:46 UTC

Flink YARN app terminated before the client receives the result

Hi,

Recently I have encountered a strange behavior of Flink on YARN, which is
that when I try to cancel a Flink job running in per-job mode on YARN using
commands like

"cancel -m yarn-cluster
-yid application_1559388106022_9412 ed7e2e0ab0a7316c1b65df6047bc6aae"

the client happily found and connected to ResourceManager and then stucks
at
Found Web Interface 172.28.28.3:50099
 of application 'application_1559388106022_9412'.

And after one minute, an exception is thrown at the client side:
Caused by: org.apache.flink.util.FlinkException: Could not cancel job
ed7e2e0ab0a7316c1b65df6047bc6aae.
    at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
    at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
    at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
    at org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
    at org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    ... 20 more
Caused by: java.util.concurrent.TimeoutException
    at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
    at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
    ... 27 more

Then I discovered that the YARN app has already terminated with FINISHED
state and KILLED final status, like below.
[image: image.png]
And after digging into the log of this finished YARN app, I have found that
TaskManager had already received the SIGTERM signal and terminated
gracefully.
org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
SIGTERM. Shutting down as requested.

Also, the log of JobManager shows that it terminated with exit code 0.
Terminating cluster entrypoint process YarnJobClusterEntrypoint with exit code 0

However, the JobManager did not return anything to the client before its
shutdown, which is different from previous versions (like Flink 1.9).

I wonder if this is a new bug on the flink-clients or flink-yarn module?

Thank you : )

Sincerely,
Weike

Re: Flink YARN app terminated before the client receives the result

Posted by Aljoscha Krettek <al...@apache.org>.
I think we have to take a step back here. For per-job (YARN) mode, the 
general problem is that there are two systems that can do shutdown (and 
other things) and two clients. There is YARN and there is Flink, and 
Flink is YARN inside YARN, in a way. The solution, I think, is that 
cancellation for YARN mode should go though YARN, not through Flink. 
Then there can be no races or other issues with the cluster shutting 
down before it has a chance to send a response.

Btw, the same goes for "attached mode" where a client waits for job 
completion. IMO, this should also go through YARN and not the Flink REST 
client.

What do you think?

Best,
Aljoscha

On 20.03.20 15:15, Till Rohrmann wrote:
> Yes you are right that `thenAcceptAsync` only breaks the control flow but
> it does not guarantee that the `RestServer` has actually sent the response
> to the client. Maybe we also need something similar to FLINK-10309 [1]. The
> problem I see with this approach is that it makes all RestHandlers stateful.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10309
> 
> Cheers,
> Till
> 
> On Fri, Mar 20, 2020 at 2:26 PM DONG, Weike <ky...@connect.hku.hk> wrote:
> 
>> Hi Tison & Till,
>>
>> Changing *thenAccept *into *thenAcceptAsync *in the
>> MiniDispatcher#cancelJob does not help to solve the problem in my
>> environment. However, I have found that adding a* Thread.sleep(2000) *before
>> the return of JobCancellationHandler#handleRequest solved the problem (at
>> least the symptom goes away). As this is only a dirty hack, I will try to
>> get a more decent solution to this problem.
>>
>> Sincerely,
>> Weike
>>
>> On Tue, Mar 17, 2020 at 11:11 PM tison <wa...@gmail.com> wrote:
>>
>>> JIRA created as https://jira.apache.org/jira/browse/FLINK-16637
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 下午5:57写道:
>>>
>>>>   @Tison could you create an issue to track the problem. Please also link
>>>> the uploaded log file for further debugging.
>>>>
>>>> I think the reason why it worked in Flink 1.9 could have been that we
>>>> had a async callback in the longer chain which broke the flow of execution
>>>> and allowed to send the response. This is no longer the case. As an easy
>>>> fix one could change thenAccept into thenAcceptAsync in the
>>>> MiniDispatcher#cancelJob. But this only solves symptoms. Maybe we should
>>>> think about allowing not only StatusHandler to close asynchronously. At the
>>>> moment we say that all other handler shut down immediately (see
>>>> AbstractHandler#closeHandlerAsync). But the problem with this change would
>>>> be that all handler would become stateful because they would need to
>>>> remember whether a request is currently ongoing or not.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Mar 17, 2020 at 9:01 AM DONG, Weike <ky...@connect.hku.hk>
>>>> wrote:
>>>>
>>>>> Hi Tison & Till and all,
>>>>>
>>>>> I have uploaded the client, taskmanager and jobmanager log to Gist (
>>>>> https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f),
>>>>> and I
>>>>> can reproduce this bug every time when trying to cancel Flink 1.10 jobs
>>>>> on
>>>>> YARN.
>>>>>
>>>>> Besides, in earlier Flink versions like 1.9, the REST API for
>>>>> *cancelling
>>>>> job with a savepoint *sometimes throws exceptions to the client side
>>>>> due to
>>>>> early shutdown of the server, even though the savepoint was successfully
>>>>> completed by reviewing the log, however when using the newly introduced
>>>>> *stop* API, that bug disappeared, however, *cancel* API seems to be
>>>>> buggy
>>>>> now.
>>>>>
>>>>> Best,
>>>>> Weike
>>>>>
>>>>> On Tue, Mar 17, 2020 at 10:17 AM tison <wa...@gmail.com> wrote:
>>>>>
>>>>>> edit: previously after the cancellation we have a longer call chain to
>>>>>> #jobReachedGloballyTerminalState which does the archive job & JM
>>>>> graceful
>>>>>> showdown, which might take some time so that ...
>>>>>>
>>>>>> Best,
>>>>>> tison.
>>>>>>
>>>>>>
>>>>>> tison <wa...@gmail.com> 于2020年3月17日周二 上午10:13写道:
>>>>>>
>>>>>>> Hi Weike & Till,
>>>>>>>
>>>>>>> I agree with Till and it is also the analysis from my side. However,
>>>>> it
>>>>>>> seems even if we don't have FLINK-15116, it is still possible that we
>>>>>>> complete the cancel future but the cluster got shutdown before it
>>>>> properly
>>>>>>> delivered the response.
>>>>>>>
>>>>>>> There is one thing strange that this behavior almost reproducible, it
>>>>>>> should be a possible order but not always. Maybe previous we have to
>>>>>>> firstly cancel the job which has a long call chain so that it
>>>>> happens we
>>>>>>> have enough time to delivered the response.
>>>>>>>
>>>>>>> But the resolution looks like we introduce some
>>>>>>> synchronization/finalization logics that clear these outstanding
>>>>> future
>>>>>>> with best effort before the cluster(RestServer) down.
>>>>>>>
>>>>>>> Best,
>>>>>>> tison.
>>>>>>>
>>>>>>>
>>>>>>> Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 上午4:12写道:
>>>>>>>
>>>>>>>> Hi Weike,
>>>>>>>>
>>>>>>>> could you share the complete logs with us? Attachments are being
>>>>>>>> filtered out by the Apache mail server but it works if you upload
>>>>> the logs
>>>>>>>> somewhere (e.g. https://gist.github.com/) and then share the link
>>>>> with
>>>>>>>> us. Ideally you run the cluster with DEBUG log settings.
>>>>>>>>
>>>>>>>> I assume that you are running Flink 1.10, right?
>>>>>>>>
>>>>>>>> My suspicion is that this behaviour has been introduced with
>>>>> FLINK-15116
>>>>>>>> [1]. It looks as if we complete the shutdown future in
>>>>>>>> MiniDispatcher#cancelJob before we return the response to the
>>>>>>>> RestClusterClient. My guess is that this triggers the shutdown of
>>>>> the
>>>>>>>> RestServer which then is not able to serve the response to the
>>>>> client. I'm
>>>>>>>> pulling in Aljoscha and Tison who introduced this change. They
>>>>> might be
>>>>>>>> able to verify my theory and propose a solution for it.
>>>>>>>>
>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-15116
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <
>>>>> kyledong@connect.hku.hk>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Yangze and all,
>>>>>>>>>
>>>>>>>>> I have tried numerous times, and this behavior persists.
>>>>>>>>>
>>>>>>>>> Below is the tail log of taskmanager.log:
>>>>>>>>>
>>>>>>>>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3]
>>>>> INFO
>>>>>>>>>   org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  -
>>>>> Free slot
>>>>>>>>> TaskSlot(index:0, state:ACTIVE, resource profile:
>>>>>>>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
>>>>>>>>> (1613968148 bytes), taskOffHeapMemory=0 bytes,
>>>>> managedMemory=1.403gb
>>>>>>>>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
>>>>>>>>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>>>>>>>>> d0a674795be98bd2574d9ea3286801cb).
>>>>>>>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3]
>>>>> INFO
>>>>>>>>>   org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove
>>>>> job
>>>>>>>>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>>>>>>>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3]
>>>>> INFO
>>>>>>>>>   org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
>>>>> JobManager
>>>>>>>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>>>>>>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3]
>>>>> INFO
>>>>>>>>>   org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
>>>>> JobManager
>>>>>>>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>>>>>>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3]
>>>>> INFO
>>>>>>>>>   org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot
>>>>> reconnect
>>>>>>>>> to job d0a674795be98bd2574d9ea3286801cb because it is not
>>>>> registered.
>>>>>>>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>>>>>>>   org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL
>>>>> 15:
>>>>>>>>> SIGTERM. Shutting down as requested.
>>>>>>>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>>>>>>>   org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL
>>>>> 15:
>>>>>>>>> SIGTERM. Shutting down as requested.
>>>>>>>>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>>>>>>>>>   org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down
>>>>> BLOB
>>>>>>>>> cache
>>>>>>>>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle
>>>>> shutdown
>>>>>>>>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl
>>>>> -
>>>>>>>>> FileChannelManager removed spill file directory
>>>>>>>>>
>>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
>>>>>>>>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager
>>>>> shutdown
>>>>>>>>> hook] INFO
>>>>>>>>>
>>>>> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>>>>>>>>> Shutting down TaskExecutorLocalStateStoresManager.
>>>>>>>>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>>>>>>>>>   org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down
>>>>> BLOB
>>>>>>>>> cache
>>>>>>>>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook]
>>>>> INFO
>>>>>>>>>   org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>>>>>>>>> FileChannelManager removed spill file directory
>>>>>>>>>
>>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
>>>>>>>>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>>>>>>>>>   org.apache.flink.runtime.filecache.FileCache  - removed file cache
>>>>>>>>> directory
>>>>>>>>>
>>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>>>>>>>>>
>>>>>>>>> As the tail log of jobmanager.log is kind of lengthy, I have
>>>>> attached
>>>>>>>>> it in this mail.
>>>>>>>>>
>>>>>>>>>  From what I have seen, the TaskManager and JobManager shut down by
>>>>>>>>> themselves, however, I have noticed some Netty exceptions (from
>>>>> the stack
>>>>>>>>> trace, it is part of the REST handler) like:
>>>>>>>>>
>>>>>>>>> ERROR
>>>>>>>>>
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>>>>>>>>   - Failed to submit a listener notification task. Event loop shut
>>>>> down?
>>>>>>>>> java.util.concurrent.RejectedExecutionException: event executor
>>>>>>>>> terminated
>>>>>>>>>
>>>>>>>>> Thus I suppose that these exceptions might be the actual cause of
>>>>>>>>> premature termination of the REST server, and I am still looking
>>>>> into the
>>>>>>>>> real cause of this.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Weike
>>>>>>>>>
>>>>>>>>> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com>
>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Would you mind to share more information about why the task
>>>>> executor
>>>>>>>>>> is killed? If it is killed by Yarn, you might get such info in
>>>>> Yarn
>>>>>>>>>> NM/RM logs.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Yangze Guo
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Yangze Guo
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <
>>>>> kyledong@connect.hku.hk>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Recently I have encountered a strange behavior of Flink on YARN,
>>>>>>>>>> which is that when I try to cancel a Flink job running in per-job
>>>>> mode on
>>>>>>>>>> YARN using commands like
>>>>>>>>>>>
>>>>>>>>>>> "cancel -m yarn-cluster -yid application_1559388106022_9412
>>>>>>>>>> ed7e2e0ab0a7316c1b65df6047bc6aae"
>>>>>>>>>>>
>>>>>>>>>>> the client happily found and connected to ResourceManager and
>>>>> then
>>>>>>>>>> stucks at
>>>>>>>>>>> Found Web Interface 172.28.28.3:50099 of application
>>>>>>>>>> 'application_1559388106022_9412'.
>>>>>>>>>>>
>>>>>>>>>>> And after one minute, an exception is thrown at the client side:
>>>>>>>>>>> Caused by: org.apache.flink.util.FlinkException: Could not
>>>>> cancel
>>>>>>>>>> job ed7e2e0ab0a7316c1b65df6047bc6aae.
>>>>>>>>>>>      at
>>>>>>>>>>
>>>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>>>>>>>>>>>      at
>>>>>>>>>>
>>>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>>>>>>>>>>>      at
>>>>>>>>>>
>>>>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>>>>>>>>>>>      at
>>>>>>>>>>
>>>>> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>>>>>>>>>>>      at
>>>>>>>>>>
>>>>> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>>>>>>>>>>>      at java.security.AccessController.doPrivileged(Native
>>>>> Method)
>>>>>>>>>>>      at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>>>>>>>      at
>>>>>>>>>>
>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>>>>>>>>>>>      ... 20 more
>>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException
>>>>>>>>>>>      at
>>>>>>>>>>
>>>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>>>>>>>>>>>      at
>>>>>>>>>>
>>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>>>>>>>>>>      at
>>>>>>>>>>
>>>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>>>>>>>>>>>      ... 27 more
>>>>>>>>>>>
>>>>>>>>>>> Then I discovered that the YARN app has already terminated with
>>>>>>>>>> FINISHED state and KILLED final status, like below.
>>>>>>>>>>>
>>>>>>>>>>> And after digging into the log of this finished YARN app, I have
>>>>>>>>>> found that TaskManager had already received the SIGTERM signal and
>>>>>>>>>> terminated gracefully.
>>>>>>>>>>> org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL
>>>>> 15:
>>>>>>>>>> SIGTERM. Shutting down as requested.
>>>>>>>>>>>
>>>>>>>>>>> Also, the log of JobManager shows that it terminated with exit
>>>>> code
>>>>>>>>>> 0.
>>>>>>>>>>> Terminating cluster entrypoint process YarnJobClusterEntrypoint
>>>>> with
>>>>>>>>>> exit code 0
>>>>>>>>>>>
>>>>>>>>>>> However, the JobManager did not return anything to the client
>>>>> before
>>>>>>>>>> its shutdown, which is different from previous versions (like
>>>>> Flink 1.9).
>>>>>>>>>>>
>>>>>>>>>>> I wonder if this is a new bug on the flink-clients or flink-yarn
>>>>>>>>>> module?
>>>>>>>>>>>
>>>>>>>>>>> Thank you : )
>>>>>>>>>>>
>>>>>>>>>>> Sincerely,
>>>>>>>>>>> Weike
>>>>>>>>>>
>>>>>>>>>
>>>>>
>>>>
> 


Re: Flink YARN app terminated before the client receives the result

Posted by Till Rohrmann <tr...@apache.org>.
Yes you are right that `thenAcceptAsync` only breaks the control flow but
it does not guarantee that the `RestServer` has actually sent the response
to the client. Maybe we also need something similar to FLINK-10309 [1]. The
problem I see with this approach is that it makes all RestHandlers stateful.

[1] https://issues.apache.org/jira/browse/FLINK-10309

Cheers,
Till

On Fri, Mar 20, 2020 at 2:26 PM DONG, Weike <ky...@connect.hku.hk> wrote:

> Hi Tison & Till,
>
> Changing *thenAccept *into *thenAcceptAsync *in the
> MiniDispatcher#cancelJob does not help to solve the problem in my
> environment. However, I have found that adding a* Thread.sleep(2000) *before
> the return of JobCancellationHandler#handleRequest solved the problem (at
> least the symptom goes away). As this is only a dirty hack, I will try to
> get a more decent solution to this problem.
>
> Sincerely,
> Weike
>
> On Tue, Mar 17, 2020 at 11:11 PM tison <wa...@gmail.com> wrote:
>
>> JIRA created as https://jira.apache.org/jira/browse/FLINK-16637
>>
>> Best,
>> tison.
>>
>>
>> Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 下午5:57写道:
>>
>>>  @Tison could you create an issue to track the problem. Please also link
>>> the uploaded log file for further debugging.
>>>
>>> I think the reason why it worked in Flink 1.9 could have been that we
>>> had a async callback in the longer chain which broke the flow of execution
>>> and allowed to send the response. This is no longer the case. As an easy
>>> fix one could change thenAccept into thenAcceptAsync in the
>>> MiniDispatcher#cancelJob. But this only solves symptoms. Maybe we should
>>> think about allowing not only StatusHandler to close asynchronously. At the
>>> moment we say that all other handler shut down immediately (see
>>> AbstractHandler#closeHandlerAsync). But the problem with this change would
>>> be that all handler would become stateful because they would need to
>>> remember whether a request is currently ongoing or not.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Mar 17, 2020 at 9:01 AM DONG, Weike <ky...@connect.hku.hk>
>>> wrote:
>>>
>>>> Hi Tison & Till and all,
>>>>
>>>> I have uploaded the client, taskmanager and jobmanager log to Gist (
>>>> https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f),
>>>> and I
>>>> can reproduce this bug every time when trying to cancel Flink 1.10 jobs
>>>> on
>>>> YARN.
>>>>
>>>> Besides, in earlier Flink versions like 1.9, the REST API for
>>>> *cancelling
>>>> job with a savepoint *sometimes throws exceptions to the client side
>>>> due to
>>>> early shutdown of the server, even though the savepoint was successfully
>>>> completed by reviewing the log, however when using the newly introduced
>>>> *stop* API, that bug disappeared, however, *cancel* API seems to be
>>>> buggy
>>>> now.
>>>>
>>>> Best,
>>>> Weike
>>>>
>>>> On Tue, Mar 17, 2020 at 10:17 AM tison <wa...@gmail.com> wrote:
>>>>
>>>> > edit: previously after the cancellation we have a longer call chain to
>>>> > #jobReachedGloballyTerminalState which does the archive job & JM
>>>> graceful
>>>> > showdown, which might take some time so that ...
>>>> >
>>>> > Best,
>>>> > tison.
>>>> >
>>>> >
>>>> > tison <wa...@gmail.com> 于2020年3月17日周二 上午10:13写道:
>>>> >
>>>> >> Hi Weike & Till,
>>>> >>
>>>> >> I agree with Till and it is also the analysis from my side. However,
>>>> it
>>>> >> seems even if we don't have FLINK-15116, it is still possible that we
>>>> >> complete the cancel future but the cluster got shutdown before it
>>>> properly
>>>> >> delivered the response.
>>>> >>
>>>> >> There is one thing strange that this behavior almost reproducible, it
>>>> >> should be a possible order but not always. Maybe previous we have to
>>>> >> firstly cancel the job which has a long call chain so that it
>>>> happens we
>>>> >> have enough time to delivered the response.
>>>> >>
>>>> >> But the resolution looks like we introduce some
>>>> >> synchronization/finalization logics that clear these outstanding
>>>> future
>>>> >> with best effort before the cluster(RestServer) down.
>>>> >>
>>>> >> Best,
>>>> >> tison.
>>>> >>
>>>> >>
>>>> >> Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 上午4:12写道:
>>>> >>
>>>> >>> Hi Weike,
>>>> >>>
>>>> >>> could you share the complete logs with us? Attachments are being
>>>> >>> filtered out by the Apache mail server but it works if you upload
>>>> the logs
>>>> >>> somewhere (e.g. https://gist.github.com/) and then share the link
>>>> with
>>>> >>> us. Ideally you run the cluster with DEBUG log settings.
>>>> >>>
>>>> >>> I assume that you are running Flink 1.10, right?
>>>> >>>
>>>> >>> My suspicion is that this behaviour has been introduced with
>>>> FLINK-15116
>>>> >>> [1]. It looks as if we complete the shutdown future in
>>>> >>> MiniDispatcher#cancelJob before we return the response to the
>>>> >>> RestClusterClient. My guess is that this triggers the shutdown of
>>>> the
>>>> >>> RestServer which then is not able to serve the response to the
>>>> client. I'm
>>>> >>> pulling in Aljoscha and Tison who introduced this change. They
>>>> might be
>>>> >>> able to verify my theory and propose a solution for it.
>>>> >>>
>>>> >>> [1] https://issues.apache.org/jira/browse/FLINK-15116
>>>> >>>
>>>> >>> Cheers,
>>>> >>> Till
>>>> >>>
>>>> >>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <
>>>> kyledong@connect.hku.hk>
>>>> >>> wrote:
>>>> >>>
>>>> >>>> Hi Yangze and all,
>>>> >>>>
>>>> >>>> I have tried numerous times, and this behavior persists.
>>>> >>>>
>>>> >>>> Below is the tail log of taskmanager.log:
>>>> >>>>
>>>> >>>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3]
>>>> INFO
>>>> >>>>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  -
>>>> Free slot
>>>> >>>> TaskSlot(index:0, state:ACTIVE, resource profile:
>>>> >>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
>>>> >>>> (1613968148 bytes), taskOffHeapMemory=0 bytes,
>>>> managedMemory=1.403gb
>>>> >>>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
>>>> >>>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>>>> >>>> d0a674795be98bd2574d9ea3286801cb).
>>>> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3]
>>>> INFO
>>>> >>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove
>>>> job
>>>> >>>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>>>> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3]
>>>> INFO
>>>> >>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
>>>> JobManager
>>>> >>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>>> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3]
>>>> INFO
>>>> >>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
>>>> JobManager
>>>> >>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>>> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3]
>>>> INFO
>>>> >>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot
>>>> reconnect
>>>> >>>> to job d0a674795be98bd2574d9ea3286801cb because it is not
>>>> registered.
>>>> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>> >>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL
>>>> 15:
>>>> >>>> SIGTERM. Shutting down as requested.
>>>> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>> >>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL
>>>> 15:
>>>> >>>> SIGTERM. Shutting down as requested.
>>>> >>>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>>>> >>>>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down
>>>> BLOB
>>>> >>>> cache
>>>> >>>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle
>>>> shutdown
>>>> >>>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl
>>>> -
>>>> >>>> FileChannelManager removed spill file directory
>>>> >>>>
>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
>>>> >>>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager
>>>> shutdown
>>>> >>>> hook] INFO
>>>> >>>>
>>>> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>>>> >>>> Shutting down TaskExecutorLocalStateStoresManager.
>>>> >>>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>>>> >>>>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down
>>>> BLOB
>>>> >>>> cache
>>>> >>>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook]
>>>> INFO
>>>> >>>>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>>>> >>>> FileChannelManager removed spill file directory
>>>> >>>>
>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
>>>> >>>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>>>> >>>>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
>>>> >>>> directory
>>>> >>>>
>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>>>> >>>>
>>>> >>>> As the tail log of jobmanager.log is kind of lengthy, I have
>>>> attached
>>>> >>>> it in this mail.
>>>> >>>>
>>>> >>>> From what I have seen, the TaskManager and JobManager shut down by
>>>> >>>> themselves, however, I have noticed some Netty exceptions (from
>>>> the stack
>>>> >>>> trace, it is part of the REST handler) like:
>>>> >>>>
>>>> >>>> ERROR
>>>> >>>>
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>>> >>>>  - Failed to submit a listener notification task. Event loop shut
>>>> down?
>>>> >>>> java.util.concurrent.RejectedExecutionException: event executor
>>>> >>>> terminated
>>>> >>>>
>>>> >>>> Thus I suppose that these exceptions might be the actual cause of
>>>> >>>> premature termination of the REST server, and I am still looking
>>>> into the
>>>> >>>> real cause of this.
>>>> >>>>
>>>> >>>> Best,
>>>> >>>> Weike
>>>> >>>>
>>>> >>>> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com>
>>>> wrote:
>>>> >>>>
>>>> >>>>> Would you mind to share more information about why the task
>>>> executor
>>>> >>>>> is killed? If it is killed by Yarn, you might get such info in
>>>> Yarn
>>>> >>>>> NM/RM logs.
>>>> >>>>>
>>>> >>>>> Best,
>>>> >>>>> Yangze Guo
>>>> >>>>>
>>>> >>>>> Best,
>>>> >>>>> Yangze Guo
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <
>>>> kyledong@connect.hku.hk>
>>>> >>>>> wrote:
>>>> >>>>> >
>>>> >>>>> > Hi,
>>>> >>>>> >
>>>> >>>>> > Recently I have encountered a strange behavior of Flink on YARN,
>>>> >>>>> which is that when I try to cancel a Flink job running in per-job
>>>> mode on
>>>> >>>>> YARN using commands like
>>>> >>>>> >
>>>> >>>>> > "cancel -m yarn-cluster -yid application_1559388106022_9412
>>>> >>>>> ed7e2e0ab0a7316c1b65df6047bc6aae"
>>>> >>>>> >
>>>> >>>>> > the client happily found and connected to ResourceManager and
>>>> then
>>>> >>>>> stucks at
>>>> >>>>> > Found Web Interface 172.28.28.3:50099 of application
>>>> >>>>> 'application_1559388106022_9412'.
>>>> >>>>> >
>>>> >>>>> > And after one minute, an exception is thrown at the client side:
>>>> >>>>> > Caused by: org.apache.flink.util.FlinkException: Could not
>>>> cancel
>>>> >>>>> job ed7e2e0ab0a7316c1b65df6047bc6aae.
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>>>> >>>>> >     at java.security.AccessController.doPrivileged(Native
>>>> Method)
>>>> >>>>> >     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>>>> >>>>> >     ... 20 more
>>>> >>>>> > Caused by: java.util.concurrent.TimeoutException
>>>> >>>>> >     at
>>>> >>>>>
>>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>>>> >>>>> >     at
>>>> >>>>>
>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>>>> >>>>> >     ... 27 more
>>>> >>>>> >
>>>> >>>>> > Then I discovered that the YARN app has already terminated with
>>>> >>>>> FINISHED state and KILLED final status, like below.
>>>> >>>>> >
>>>> >>>>> > And after digging into the log of this finished YARN app, I have
>>>> >>>>> found that TaskManager had already received the SIGTERM signal and
>>>> >>>>> terminated gracefully.
>>>> >>>>> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL
>>>> 15:
>>>> >>>>> SIGTERM. Shutting down as requested.
>>>> >>>>> >
>>>> >>>>> > Also, the log of JobManager shows that it terminated with exit
>>>> code
>>>> >>>>> 0.
>>>> >>>>> > Terminating cluster entrypoint process YarnJobClusterEntrypoint
>>>> with
>>>> >>>>> exit code 0
>>>> >>>>> >
>>>> >>>>> > However, the JobManager did not return anything to the client
>>>> before
>>>> >>>>> its shutdown, which is different from previous versions (like
>>>> Flink 1.9).
>>>> >>>>> >
>>>> >>>>> > I wonder if this is a new bug on the flink-clients or flink-yarn
>>>> >>>>> module?
>>>> >>>>> >
>>>> >>>>> > Thank you : )
>>>> >>>>> >
>>>> >>>>> > Sincerely,
>>>> >>>>> > Weike
>>>> >>>>>
>>>> >>>>
>>>>
>>>

Re: Flink YARN app terminated before the client receives the result

Posted by Till Rohrmann <tr...@apache.org>.
Yes you are right that `thenAcceptAsync` only breaks the control flow but
it does not guarantee that the `RestServer` has actually sent the response
to the client. Maybe we also need something similar to FLINK-10309 [1]. The
problem I see with this approach is that it makes all RestHandlers stateful.

[1] https://issues.apache.org/jira/browse/FLINK-10309

Cheers,
Till

On Fri, Mar 20, 2020 at 2:26 PM DONG, Weike <ky...@connect.hku.hk> wrote:

> Hi Tison & Till,
>
> Changing *thenAccept *into *thenAcceptAsync *in the
> MiniDispatcher#cancelJob does not help to solve the problem in my
> environment. However, I have found that adding a* Thread.sleep(2000) *before
> the return of JobCancellationHandler#handleRequest solved the problem (at
> least the symptom goes away). As this is only a dirty hack, I will try to
> get a more decent solution to this problem.
>
> Sincerely,
> Weike
>
> On Tue, Mar 17, 2020 at 11:11 PM tison <wa...@gmail.com> wrote:
>
>> JIRA created as https://jira.apache.org/jira/browse/FLINK-16637
>>
>> Best,
>> tison.
>>
>>
>> Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 下午5:57写道:
>>
>>>  @Tison could you create an issue to track the problem. Please also link
>>> the uploaded log file for further debugging.
>>>
>>> I think the reason why it worked in Flink 1.9 could have been that we
>>> had a async callback in the longer chain which broke the flow of execution
>>> and allowed to send the response. This is no longer the case. As an easy
>>> fix one could change thenAccept into thenAcceptAsync in the
>>> MiniDispatcher#cancelJob. But this only solves symptoms. Maybe we should
>>> think about allowing not only StatusHandler to close asynchronously. At the
>>> moment we say that all other handler shut down immediately (see
>>> AbstractHandler#closeHandlerAsync). But the problem with this change would
>>> be that all handler would become stateful because they would need to
>>> remember whether a request is currently ongoing or not.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Mar 17, 2020 at 9:01 AM DONG, Weike <ky...@connect.hku.hk>
>>> wrote:
>>>
>>>> Hi Tison & Till and all,
>>>>
>>>> I have uploaded the client, taskmanager and jobmanager log to Gist (
>>>> https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f),
>>>> and I
>>>> can reproduce this bug every time when trying to cancel Flink 1.10 jobs
>>>> on
>>>> YARN.
>>>>
>>>> Besides, in earlier Flink versions like 1.9, the REST API for
>>>> *cancelling
>>>> job with a savepoint *sometimes throws exceptions to the client side
>>>> due to
>>>> early shutdown of the server, even though the savepoint was successfully
>>>> completed by reviewing the log, however when using the newly introduced
>>>> *stop* API, that bug disappeared, however, *cancel* API seems to be
>>>> buggy
>>>> now.
>>>>
>>>> Best,
>>>> Weike
>>>>
>>>> On Tue, Mar 17, 2020 at 10:17 AM tison <wa...@gmail.com> wrote:
>>>>
>>>> > edit: previously after the cancellation we have a longer call chain to
>>>> > #jobReachedGloballyTerminalState which does the archive job & JM
>>>> graceful
>>>> > showdown, which might take some time so that ...
>>>> >
>>>> > Best,
>>>> > tison.
>>>> >
>>>> >
>>>> > tison <wa...@gmail.com> 于2020年3月17日周二 上午10:13写道:
>>>> >
>>>> >> Hi Weike & Till,
>>>> >>
>>>> >> I agree with Till and it is also the analysis from my side. However,
>>>> it
>>>> >> seems even if we don't have FLINK-15116, it is still possible that we
>>>> >> complete the cancel future but the cluster got shutdown before it
>>>> properly
>>>> >> delivered the response.
>>>> >>
>>>> >> There is one thing strange that this behavior almost reproducible, it
>>>> >> should be a possible order but not always. Maybe previous we have to
>>>> >> firstly cancel the job which has a long call chain so that it
>>>> happens we
>>>> >> have enough time to delivered the response.
>>>> >>
>>>> >> But the resolution looks like we introduce some
>>>> >> synchronization/finalization logics that clear these outstanding
>>>> future
>>>> >> with best effort before the cluster(RestServer) down.
>>>> >>
>>>> >> Best,
>>>> >> tison.
>>>> >>
>>>> >>
>>>> >> Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 上午4:12写道:
>>>> >>
>>>> >>> Hi Weike,
>>>> >>>
>>>> >>> could you share the complete logs with us? Attachments are being
>>>> >>> filtered out by the Apache mail server but it works if you upload
>>>> the logs
>>>> >>> somewhere (e.g. https://gist.github.com/) and then share the link
>>>> with
>>>> >>> us. Ideally you run the cluster with DEBUG log settings.
>>>> >>>
>>>> >>> I assume that you are running Flink 1.10, right?
>>>> >>>
>>>> >>> My suspicion is that this behaviour has been introduced with
>>>> FLINK-15116
>>>> >>> [1]. It looks as if we complete the shutdown future in
>>>> >>> MiniDispatcher#cancelJob before we return the response to the
>>>> >>> RestClusterClient. My guess is that this triggers the shutdown of
>>>> the
>>>> >>> RestServer which then is not able to serve the response to the
>>>> client. I'm
>>>> >>> pulling in Aljoscha and Tison who introduced this change. They
>>>> might be
>>>> >>> able to verify my theory and propose a solution for it.
>>>> >>>
>>>> >>> [1] https://issues.apache.org/jira/browse/FLINK-15116
>>>> >>>
>>>> >>> Cheers,
>>>> >>> Till
>>>> >>>
>>>> >>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <
>>>> kyledong@connect.hku.hk>
>>>> >>> wrote:
>>>> >>>
>>>> >>>> Hi Yangze and all,
>>>> >>>>
>>>> >>>> I have tried numerous times, and this behavior persists.
>>>> >>>>
>>>> >>>> Below is the tail log of taskmanager.log:
>>>> >>>>
>>>> >>>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3]
>>>> INFO
>>>> >>>>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  -
>>>> Free slot
>>>> >>>> TaskSlot(index:0, state:ACTIVE, resource profile:
>>>> >>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
>>>> >>>> (1613968148 bytes), taskOffHeapMemory=0 bytes,
>>>> managedMemory=1.403gb
>>>> >>>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
>>>> >>>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>>>> >>>> d0a674795be98bd2574d9ea3286801cb).
>>>> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3]
>>>> INFO
>>>> >>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove
>>>> job
>>>> >>>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>>>> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3]
>>>> INFO
>>>> >>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
>>>> JobManager
>>>> >>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>>> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3]
>>>> INFO
>>>> >>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
>>>> JobManager
>>>> >>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>>> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3]
>>>> INFO
>>>> >>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot
>>>> reconnect
>>>> >>>> to job d0a674795be98bd2574d9ea3286801cb because it is not
>>>> registered.
>>>> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>> >>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL
>>>> 15:
>>>> >>>> SIGTERM. Shutting down as requested.
>>>> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>> >>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL
>>>> 15:
>>>> >>>> SIGTERM. Shutting down as requested.
>>>> >>>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>>>> >>>>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down
>>>> BLOB
>>>> >>>> cache
>>>> >>>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle
>>>> shutdown
>>>> >>>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl
>>>> -
>>>> >>>> FileChannelManager removed spill file directory
>>>> >>>>
>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
>>>> >>>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager
>>>> shutdown
>>>> >>>> hook] INFO
>>>> >>>>
>>>> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>>>> >>>> Shutting down TaskExecutorLocalStateStoresManager.
>>>> >>>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>>>> >>>>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down
>>>> BLOB
>>>> >>>> cache
>>>> >>>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook]
>>>> INFO
>>>> >>>>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>>>> >>>> FileChannelManager removed spill file directory
>>>> >>>>
>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
>>>> >>>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>>>> >>>>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
>>>> >>>> directory
>>>> >>>>
>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>>>> >>>>
>>>> >>>> As the tail log of jobmanager.log is kind of lengthy, I have
>>>> attached
>>>> >>>> it in this mail.
>>>> >>>>
>>>> >>>> From what I have seen, the TaskManager and JobManager shut down by
>>>> >>>> themselves, however, I have noticed some Netty exceptions (from
>>>> the stack
>>>> >>>> trace, it is part of the REST handler) like:
>>>> >>>>
>>>> >>>> ERROR
>>>> >>>>
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>>> >>>>  - Failed to submit a listener notification task. Event loop shut
>>>> down?
>>>> >>>> java.util.concurrent.RejectedExecutionException: event executor
>>>> >>>> terminated
>>>> >>>>
>>>> >>>> Thus I suppose that these exceptions might be the actual cause of
>>>> >>>> premature termination of the REST server, and I am still looking
>>>> into the
>>>> >>>> real cause of this.
>>>> >>>>
>>>> >>>> Best,
>>>> >>>> Weike
>>>> >>>>
>>>> >>>> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com>
>>>> wrote:
>>>> >>>>
>>>> >>>>> Would you mind to share more information about why the task
>>>> executor
>>>> >>>>> is killed? If it is killed by Yarn, you might get such info in
>>>> Yarn
>>>> >>>>> NM/RM logs.
>>>> >>>>>
>>>> >>>>> Best,
>>>> >>>>> Yangze Guo
>>>> >>>>>
>>>> >>>>> Best,
>>>> >>>>> Yangze Guo
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <
>>>> kyledong@connect.hku.hk>
>>>> >>>>> wrote:
>>>> >>>>> >
>>>> >>>>> > Hi,
>>>> >>>>> >
>>>> >>>>> > Recently I have encountered a strange behavior of Flink on YARN,
>>>> >>>>> which is that when I try to cancel a Flink job running in per-job
>>>> mode on
>>>> >>>>> YARN using commands like
>>>> >>>>> >
>>>> >>>>> > "cancel -m yarn-cluster -yid application_1559388106022_9412
>>>> >>>>> ed7e2e0ab0a7316c1b65df6047bc6aae"
>>>> >>>>> >
>>>> >>>>> > the client happily found and connected to ResourceManager and
>>>> then
>>>> >>>>> stucks at
>>>> >>>>> > Found Web Interface 172.28.28.3:50099 of application
>>>> >>>>> 'application_1559388106022_9412'.
>>>> >>>>> >
>>>> >>>>> > And after one minute, an exception is thrown at the client side:
>>>> >>>>> > Caused by: org.apache.flink.util.FlinkException: Could not
>>>> cancel
>>>> >>>>> job ed7e2e0ab0a7316c1b65df6047bc6aae.
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>>>> >>>>> >     at java.security.AccessController.doPrivileged(Native
>>>> Method)
>>>> >>>>> >     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>>>> >>>>> >     ... 20 more
>>>> >>>>> > Caused by: java.util.concurrent.TimeoutException
>>>> >>>>> >     at
>>>> >>>>>
>>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>>>> >>>>> >     at
>>>> >>>>>
>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>>> >>>>> >     at
>>>> >>>>>
>>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>>>> >>>>> >     ... 27 more
>>>> >>>>> >
>>>> >>>>> > Then I discovered that the YARN app has already terminated with
>>>> >>>>> FINISHED state and KILLED final status, like below.
>>>> >>>>> >
>>>> >>>>> > And after digging into the log of this finished YARN app, I have
>>>> >>>>> found that TaskManager had already received the SIGTERM signal and
>>>> >>>>> terminated gracefully.
>>>> >>>>> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL
>>>> 15:
>>>> >>>>> SIGTERM. Shutting down as requested.
>>>> >>>>> >
>>>> >>>>> > Also, the log of JobManager shows that it terminated with exit
>>>> code
>>>> >>>>> 0.
>>>> >>>>> > Terminating cluster entrypoint process YarnJobClusterEntrypoint
>>>> with
>>>> >>>>> exit code 0
>>>> >>>>> >
>>>> >>>>> > However, the JobManager did not return anything to the client
>>>> before
>>>> >>>>> its shutdown, which is different from previous versions (like
>>>> Flink 1.9).
>>>> >>>>> >
>>>> >>>>> > I wonder if this is a new bug on the flink-clients or flink-yarn
>>>> >>>>> module?
>>>> >>>>> >
>>>> >>>>> > Thank you : )
>>>> >>>>> >
>>>> >>>>> > Sincerely,
>>>> >>>>> > Weike
>>>> >>>>>
>>>> >>>>
>>>>
>>>

Re: Flink YARN app terminated before the client receives the result

Posted by "DONG, Weike" <ky...@connect.hku.hk>.
Hi Tison & Till,

Changing *thenAccept *into *thenAcceptAsync *in the
MiniDispatcher#cancelJob does not help to solve the problem in my
environment. However, I have found that adding a* Thread.sleep(2000) *before
the return of JobCancellationHandler#handleRequest solved the problem (at
least the symptom goes away). As this is only a dirty hack, I will try to
get a more decent solution to this problem.

Sincerely,
Weike

On Tue, Mar 17, 2020 at 11:11 PM tison <wa...@gmail.com> wrote:

> JIRA created as https://jira.apache.org/jira/browse/FLINK-16637
>
> Best,
> tison.
>
>
> Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 下午5:57写道:
>
>>  @Tison could you create an issue to track the problem. Please also link
>> the uploaded log file for further debugging.
>>
>> I think the reason why it worked in Flink 1.9 could have been that we had
>> a async callback in the longer chain which broke the flow of execution and
>> allowed to send the response. This is no longer the case. As an easy fix
>> one could change thenAccept into thenAcceptAsync in the
>> MiniDispatcher#cancelJob. But this only solves symptoms. Maybe we should
>> think about allowing not only StatusHandler to close asynchronously. At the
>> moment we say that all other handler shut down immediately (see
>> AbstractHandler#closeHandlerAsync). But the problem with this change would
>> be that all handler would become stateful because they would need to
>> remember whether a request is currently ongoing or not.
>>
>> Cheers,
>> Till
>>
>> On Tue, Mar 17, 2020 at 9:01 AM DONG, Weike <ky...@connect.hku.hk>
>> wrote:
>>
>>> Hi Tison & Till and all,
>>>
>>> I have uploaded the client, taskmanager and jobmanager log to Gist (
>>> https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f), and
>>> I
>>> can reproduce this bug every time when trying to cancel Flink 1.10 jobs
>>> on
>>> YARN.
>>>
>>> Besides, in earlier Flink versions like 1.9, the REST API for *cancelling
>>> job with a savepoint *sometimes throws exceptions to the client side due
>>> to
>>> early shutdown of the server, even though the savepoint was successfully
>>> completed by reviewing the log, however when using the newly introduced
>>> *stop* API, that bug disappeared, however, *cancel* API seems to be buggy
>>> now.
>>>
>>> Best,
>>> Weike
>>>
>>> On Tue, Mar 17, 2020 at 10:17 AM tison <wa...@gmail.com> wrote:
>>>
>>> > edit: previously after the cancellation we have a longer call chain to
>>> > #jobReachedGloballyTerminalState which does the archive job & JM
>>> graceful
>>> > showdown, which might take some time so that ...
>>> >
>>> > Best,
>>> > tison.
>>> >
>>> >
>>> > tison <wa...@gmail.com> 于2020年3月17日周二 上午10:13写道:
>>> >
>>> >> Hi Weike & Till,
>>> >>
>>> >> I agree with Till and it is also the analysis from my side. However,
>>> it
>>> >> seems even if we don't have FLINK-15116, it is still possible that we
>>> >> complete the cancel future but the cluster got shutdown before it
>>> properly
>>> >> delivered the response.
>>> >>
>>> >> There is one thing strange that this behavior almost reproducible, it
>>> >> should be a possible order but not always. Maybe previous we have to
>>> >> firstly cancel the job which has a long call chain so that it happens
>>> we
>>> >> have enough time to delivered the response.
>>> >>
>>> >> But the resolution looks like we introduce some
>>> >> synchronization/finalization logics that clear these outstanding
>>> future
>>> >> with best effort before the cluster(RestServer) down.
>>> >>
>>> >> Best,
>>> >> tison.
>>> >>
>>> >>
>>> >> Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 上午4:12写道:
>>> >>
>>> >>> Hi Weike,
>>> >>>
>>> >>> could you share the complete logs with us? Attachments are being
>>> >>> filtered out by the Apache mail server but it works if you upload
>>> the logs
>>> >>> somewhere (e.g. https://gist.github.com/) and then share the link
>>> with
>>> >>> us. Ideally you run the cluster with DEBUG log settings.
>>> >>>
>>> >>> I assume that you are running Flink 1.10, right?
>>> >>>
>>> >>> My suspicion is that this behaviour has been introduced with
>>> FLINK-15116
>>> >>> [1]. It looks as if we complete the shutdown future in
>>> >>> MiniDispatcher#cancelJob before we return the response to the
>>> >>> RestClusterClient. My guess is that this triggers the shutdown of the
>>> >>> RestServer which then is not able to serve the response to the
>>> client. I'm
>>> >>> pulling in Aljoscha and Tison who introduced this change. They might
>>> be
>>> >>> able to verify my theory and propose a solution for it.
>>> >>>
>>> >>> [1] https://issues.apache.org/jira/browse/FLINK-15116
>>> >>>
>>> >>> Cheers,
>>> >>> Till
>>> >>>
>>> >>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <kyledong@connect.hku.hk
>>> >
>>> >>> wrote:
>>> >>>
>>> >>>> Hi Yangze and all,
>>> >>>>
>>> >>>> I have tried numerous times, and this behavior persists.
>>> >>>>
>>> >>>> Below is the tail log of taskmanager.log:
>>> >>>>
>>> >>>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
>>> >>>>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  -
>>> Free slot
>>> >>>> TaskSlot(index:0, state:ACTIVE, resource profile:
>>> >>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
>>> >>>> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
>>> >>>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
>>> >>>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>>> >>>> d0a674795be98bd2574d9ea3286801cb).
>>> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>> >>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove
>>> job
>>> >>>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>>> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>> >>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
>>> JobManager
>>> >>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>> >>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
>>> JobManager
>>> >>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>> >>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot
>>> reconnect
>>> >>>> to job d0a674795be98bd2574d9ea3286801cb because it is not
>>> registered.
>>> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>> >>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>> >>>> SIGTERM. Shutting down as requested.
>>> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>> >>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>> >>>> SIGTERM. Shutting down as requested.
>>> >>>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>>> >>>>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down
>>> BLOB
>>> >>>> cache
>>> >>>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle
>>> shutdown
>>> >>>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl
>>> -
>>> >>>> FileChannelManager removed spill file directory
>>> >>>>
>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
>>> >>>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager
>>> shutdown
>>> >>>> hook] INFO
>>> >>>>
>>> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>>> >>>> Shutting down TaskExecutorLocalStateStoresManager.
>>> >>>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>>> >>>>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down
>>> BLOB
>>> >>>> cache
>>> >>>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook]
>>> INFO
>>> >>>>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>>> >>>> FileChannelManager removed spill file directory
>>> >>>>
>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
>>> >>>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>>> >>>>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
>>> >>>> directory
>>> >>>>
>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>>> >>>>
>>> >>>> As the tail log of jobmanager.log is kind of lengthy, I have
>>> attached
>>> >>>> it in this mail.
>>> >>>>
>>> >>>> From what I have seen, the TaskManager and JobManager shut down by
>>> >>>> themselves, however, I have noticed some Netty exceptions (from the
>>> stack
>>> >>>> trace, it is part of the REST handler) like:
>>> >>>>
>>> >>>> ERROR
>>> >>>>
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>> >>>>  - Failed to submit a listener notification task. Event loop shut
>>> down?
>>> >>>> java.util.concurrent.RejectedExecutionException: event executor
>>> >>>> terminated
>>> >>>>
>>> >>>> Thus I suppose that these exceptions might be the actual cause of
>>> >>>> premature termination of the REST server, and I am still looking
>>> into the
>>> >>>> real cause of this.
>>> >>>>
>>> >>>> Best,
>>> >>>> Weike
>>> >>>>
>>> >>>> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com>
>>> wrote:
>>> >>>>
>>> >>>>> Would you mind to share more information about why the task
>>> executor
>>> >>>>> is killed? If it is killed by Yarn, you might get such info in Yarn
>>> >>>>> NM/RM logs.
>>> >>>>>
>>> >>>>> Best,
>>> >>>>> Yangze Guo
>>> >>>>>
>>> >>>>> Best,
>>> >>>>> Yangze Guo
>>> >>>>>
>>> >>>>>
>>> >>>>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <
>>> kyledong@connect.hku.hk>
>>> >>>>> wrote:
>>> >>>>> >
>>> >>>>> > Hi,
>>> >>>>> >
>>> >>>>> > Recently I have encountered a strange behavior of Flink on YARN,
>>> >>>>> which is that when I try to cancel a Flink job running in per-job
>>> mode on
>>> >>>>> YARN using commands like
>>> >>>>> >
>>> >>>>> > "cancel -m yarn-cluster -yid application_1559388106022_9412
>>> >>>>> ed7e2e0ab0a7316c1b65df6047bc6aae"
>>> >>>>> >
>>> >>>>> > the client happily found and connected to ResourceManager and
>>> then
>>> >>>>> stucks at
>>> >>>>> > Found Web Interface 172.28.28.3:50099 of application
>>> >>>>> 'application_1559388106022_9412'.
>>> >>>>> >
>>> >>>>> > And after one minute, an exception is thrown at the client side:
>>> >>>>> > Caused by: org.apache.flink.util.FlinkException: Could not cancel
>>> >>>>> job ed7e2e0ab0a7316c1b65df6047bc6aae.
>>> >>>>> >     at
>>> >>>>>
>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>>> >>>>> >     at
>>> >>>>>
>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>>> >>>>> >     at
>>> >>>>>
>>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>>> >>>>> >     at
>>> >>>>>
>>> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>>> >>>>> >     at
>>> >>>>>
>>> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>>> >>>>> >     at java.security.AccessController.doPrivileged(Native Method)
>>> >>>>> >     at javax.security.auth.Subject.doAs(Subject.java:422)
>>> >>>>> >     at
>>> >>>>>
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>>> >>>>> >     ... 20 more
>>> >>>>> > Caused by: java.util.concurrent.TimeoutException
>>> >>>>> >     at
>>> >>>>>
>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>>> >>>>> >     at
>>> >>>>>
>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>> >>>>> >     at
>>> >>>>>
>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>>> >>>>> >     ... 27 more
>>> >>>>> >
>>> >>>>> > Then I discovered that the YARN app has already terminated with
>>> >>>>> FINISHED state and KILLED final status, like below.
>>> >>>>> >
>>> >>>>> > And after digging into the log of this finished YARN app, I have
>>> >>>>> found that TaskManager had already received the SIGTERM signal and
>>> >>>>> terminated gracefully.
>>> >>>>> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL
>>> 15:
>>> >>>>> SIGTERM. Shutting down as requested.
>>> >>>>> >
>>> >>>>> > Also, the log of JobManager shows that it terminated with exit
>>> code
>>> >>>>> 0.
>>> >>>>> > Terminating cluster entrypoint process YarnJobClusterEntrypoint
>>> with
>>> >>>>> exit code 0
>>> >>>>> >
>>> >>>>> > However, the JobManager did not return anything to the client
>>> before
>>> >>>>> its shutdown, which is different from previous versions (like
>>> Flink 1.9).
>>> >>>>> >
>>> >>>>> > I wonder if this is a new bug on the flink-clients or flink-yarn
>>> >>>>> module?
>>> >>>>> >
>>> >>>>> > Thank you : )
>>> >>>>> >
>>> >>>>> > Sincerely,
>>> >>>>> > Weike
>>> >>>>>
>>> >>>>
>>>
>>

Re: Flink YARN app terminated before the client receives the result

Posted by "DONG, Weike" <ky...@connect.hku.hk>.
Hi Tison & Till,

Changing *thenAccept *into *thenAcceptAsync *in the
MiniDispatcher#cancelJob does not help to solve the problem in my
environment. However, I have found that adding a* Thread.sleep(2000) *before
the return of JobCancellationHandler#handleRequest solved the problem (at
least the symptom goes away). As this is only a dirty hack, I will try to
get a more decent solution to this problem.

Sincerely,
Weike

On Tue, Mar 17, 2020 at 11:11 PM tison <wa...@gmail.com> wrote:

> JIRA created as https://jira.apache.org/jira/browse/FLINK-16637
>
> Best,
> tison.
>
>
> Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 下午5:57写道:
>
>>  @Tison could you create an issue to track the problem. Please also link
>> the uploaded log file for further debugging.
>>
>> I think the reason why it worked in Flink 1.9 could have been that we had
>> a async callback in the longer chain which broke the flow of execution and
>> allowed to send the response. This is no longer the case. As an easy fix
>> one could change thenAccept into thenAcceptAsync in the
>> MiniDispatcher#cancelJob. But this only solves symptoms. Maybe we should
>> think about allowing not only StatusHandler to close asynchronously. At the
>> moment we say that all other handler shut down immediately (see
>> AbstractHandler#closeHandlerAsync). But the problem with this change would
>> be that all handler would become stateful because they would need to
>> remember whether a request is currently ongoing or not.
>>
>> Cheers,
>> Till
>>
>> On Tue, Mar 17, 2020 at 9:01 AM DONG, Weike <ky...@connect.hku.hk>
>> wrote:
>>
>>> Hi Tison & Till and all,
>>>
>>> I have uploaded the client, taskmanager and jobmanager log to Gist (
>>> https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f), and
>>> I
>>> can reproduce this bug every time when trying to cancel Flink 1.10 jobs
>>> on
>>> YARN.
>>>
>>> Besides, in earlier Flink versions like 1.9, the REST API for *cancelling
>>> job with a savepoint *sometimes throws exceptions to the client side due
>>> to
>>> early shutdown of the server, even though the savepoint was successfully
>>> completed by reviewing the log, however when using the newly introduced
>>> *stop* API, that bug disappeared, however, *cancel* API seems to be buggy
>>> now.
>>>
>>> Best,
>>> Weike
>>>
>>> On Tue, Mar 17, 2020 at 10:17 AM tison <wa...@gmail.com> wrote:
>>>
>>> > edit: previously after the cancellation we have a longer call chain to
>>> > #jobReachedGloballyTerminalState which does the archive job & JM
>>> graceful
>>> > showdown, which might take some time so that ...
>>> >
>>> > Best,
>>> > tison.
>>> >
>>> >
>>> > tison <wa...@gmail.com> 于2020年3月17日周二 上午10:13写道:
>>> >
>>> >> Hi Weike & Till,
>>> >>
>>> >> I agree with Till and it is also the analysis from my side. However,
>>> it
>>> >> seems even if we don't have FLINK-15116, it is still possible that we
>>> >> complete the cancel future but the cluster got shutdown before it
>>> properly
>>> >> delivered the response.
>>> >>
>>> >> There is one thing strange that this behavior almost reproducible, it
>>> >> should be a possible order but not always. Maybe previous we have to
>>> >> firstly cancel the job which has a long call chain so that it happens
>>> we
>>> >> have enough time to delivered the response.
>>> >>
>>> >> But the resolution looks like we introduce some
>>> >> synchronization/finalization logics that clear these outstanding
>>> future
>>> >> with best effort before the cluster(RestServer) down.
>>> >>
>>> >> Best,
>>> >> tison.
>>> >>
>>> >>
>>> >> Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 上午4:12写道:
>>> >>
>>> >>> Hi Weike,
>>> >>>
>>> >>> could you share the complete logs with us? Attachments are being
>>> >>> filtered out by the Apache mail server but it works if you upload
>>> the logs
>>> >>> somewhere (e.g. https://gist.github.com/) and then share the link
>>> with
>>> >>> us. Ideally you run the cluster with DEBUG log settings.
>>> >>>
>>> >>> I assume that you are running Flink 1.10, right?
>>> >>>
>>> >>> My suspicion is that this behaviour has been introduced with
>>> FLINK-15116
>>> >>> [1]. It looks as if we complete the shutdown future in
>>> >>> MiniDispatcher#cancelJob before we return the response to the
>>> >>> RestClusterClient. My guess is that this triggers the shutdown of the
>>> >>> RestServer which then is not able to serve the response to the
>>> client. I'm
>>> >>> pulling in Aljoscha and Tison who introduced this change. They might
>>> be
>>> >>> able to verify my theory and propose a solution for it.
>>> >>>
>>> >>> [1] https://issues.apache.org/jira/browse/FLINK-15116
>>> >>>
>>> >>> Cheers,
>>> >>> Till
>>> >>>
>>> >>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <kyledong@connect.hku.hk
>>> >
>>> >>> wrote:
>>> >>>
>>> >>>> Hi Yangze and all,
>>> >>>>
>>> >>>> I have tried numerous times, and this behavior persists.
>>> >>>>
>>> >>>> Below is the tail log of taskmanager.log:
>>> >>>>
>>> >>>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
>>> >>>>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  -
>>> Free slot
>>> >>>> TaskSlot(index:0, state:ACTIVE, resource profile:
>>> >>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
>>> >>>> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
>>> >>>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
>>> >>>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>>> >>>> d0a674795be98bd2574d9ea3286801cb).
>>> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>> >>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove
>>> job
>>> >>>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>>> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>> >>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
>>> JobManager
>>> >>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>> >>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
>>> JobManager
>>> >>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>> >>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot
>>> reconnect
>>> >>>> to job d0a674795be98bd2574d9ea3286801cb because it is not
>>> registered.
>>> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>> >>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>> >>>> SIGTERM. Shutting down as requested.
>>> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>> >>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>> >>>> SIGTERM. Shutting down as requested.
>>> >>>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>>> >>>>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down
>>> BLOB
>>> >>>> cache
>>> >>>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle
>>> shutdown
>>> >>>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl
>>> -
>>> >>>> FileChannelManager removed spill file directory
>>> >>>>
>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
>>> >>>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager
>>> shutdown
>>> >>>> hook] INFO
>>> >>>>
>>> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>>> >>>> Shutting down TaskExecutorLocalStateStoresManager.
>>> >>>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>>> >>>>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down
>>> BLOB
>>> >>>> cache
>>> >>>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook]
>>> INFO
>>> >>>>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>>> >>>> FileChannelManager removed spill file directory
>>> >>>>
>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
>>> >>>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>>> >>>>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
>>> >>>> directory
>>> >>>>
>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>>> >>>>
>>> >>>> As the tail log of jobmanager.log is kind of lengthy, I have
>>> attached
>>> >>>> it in this mail.
>>> >>>>
>>> >>>> From what I have seen, the TaskManager and JobManager shut down by
>>> >>>> themselves, however, I have noticed some Netty exceptions (from the
>>> stack
>>> >>>> trace, it is part of the REST handler) like:
>>> >>>>
>>> >>>> ERROR
>>> >>>>
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>> >>>>  - Failed to submit a listener notification task. Event loop shut
>>> down?
>>> >>>> java.util.concurrent.RejectedExecutionException: event executor
>>> >>>> terminated
>>> >>>>
>>> >>>> Thus I suppose that these exceptions might be the actual cause of
>>> >>>> premature termination of the REST server, and I am still looking
>>> into the
>>> >>>> real cause of this.
>>> >>>>
>>> >>>> Best,
>>> >>>> Weike
>>> >>>>
>>> >>>> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com>
>>> wrote:
>>> >>>>
>>> >>>>> Would you mind to share more information about why the task
>>> executor
>>> >>>>> is killed? If it is killed by Yarn, you might get such info in Yarn
>>> >>>>> NM/RM logs.
>>> >>>>>
>>> >>>>> Best,
>>> >>>>> Yangze Guo
>>> >>>>>
>>> >>>>> Best,
>>> >>>>> Yangze Guo
>>> >>>>>
>>> >>>>>
>>> >>>>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <
>>> kyledong@connect.hku.hk>
>>> >>>>> wrote:
>>> >>>>> >
>>> >>>>> > Hi,
>>> >>>>> >
>>> >>>>> > Recently I have encountered a strange behavior of Flink on YARN,
>>> >>>>> which is that when I try to cancel a Flink job running in per-job
>>> mode on
>>> >>>>> YARN using commands like
>>> >>>>> >
>>> >>>>> > "cancel -m yarn-cluster -yid application_1559388106022_9412
>>> >>>>> ed7e2e0ab0a7316c1b65df6047bc6aae"
>>> >>>>> >
>>> >>>>> > the client happily found and connected to ResourceManager and
>>> then
>>> >>>>> stucks at
>>> >>>>> > Found Web Interface 172.28.28.3:50099 of application
>>> >>>>> 'application_1559388106022_9412'.
>>> >>>>> >
>>> >>>>> > And after one minute, an exception is thrown at the client side:
>>> >>>>> > Caused by: org.apache.flink.util.FlinkException: Could not cancel
>>> >>>>> job ed7e2e0ab0a7316c1b65df6047bc6aae.
>>> >>>>> >     at
>>> >>>>>
>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>>> >>>>> >     at
>>> >>>>>
>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>>> >>>>> >     at
>>> >>>>>
>>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>>> >>>>> >     at
>>> >>>>>
>>> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>>> >>>>> >     at
>>> >>>>>
>>> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>>> >>>>> >     at java.security.AccessController.doPrivileged(Native Method)
>>> >>>>> >     at javax.security.auth.Subject.doAs(Subject.java:422)
>>> >>>>> >     at
>>> >>>>>
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>>> >>>>> >     ... 20 more
>>> >>>>> > Caused by: java.util.concurrent.TimeoutException
>>> >>>>> >     at
>>> >>>>>
>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>>> >>>>> >     at
>>> >>>>>
>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>> >>>>> >     at
>>> >>>>>
>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>>> >>>>> >     ... 27 more
>>> >>>>> >
>>> >>>>> > Then I discovered that the YARN app has already terminated with
>>> >>>>> FINISHED state and KILLED final status, like below.
>>> >>>>> >
>>> >>>>> > And after digging into the log of this finished YARN app, I have
>>> >>>>> found that TaskManager had already received the SIGTERM signal and
>>> >>>>> terminated gracefully.
>>> >>>>> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL
>>> 15:
>>> >>>>> SIGTERM. Shutting down as requested.
>>> >>>>> >
>>> >>>>> > Also, the log of JobManager shows that it terminated with exit
>>> code
>>> >>>>> 0.
>>> >>>>> > Terminating cluster entrypoint process YarnJobClusterEntrypoint
>>> with
>>> >>>>> exit code 0
>>> >>>>> >
>>> >>>>> > However, the JobManager did not return anything to the client
>>> before
>>> >>>>> its shutdown, which is different from previous versions (like
>>> Flink 1.9).
>>> >>>>> >
>>> >>>>> > I wonder if this is a new bug on the flink-clients or flink-yarn
>>> >>>>> module?
>>> >>>>> >
>>> >>>>> > Thank you : )
>>> >>>>> >
>>> >>>>> > Sincerely,
>>> >>>>> > Weike
>>> >>>>>
>>> >>>>
>>>
>>

Re: Flink YARN app terminated before the client receives the result

Posted by tison <wa...@gmail.com>.
JIRA created as https://jira.apache.org/jira/browse/FLINK-16637

Best,
tison.


Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 下午5:57写道:

>  @Tison could you create an issue to track the problem. Please also link
> the uploaded log file for further debugging.
>
> I think the reason why it worked in Flink 1.9 could have been that we had
> a async callback in the longer chain which broke the flow of execution and
> allowed to send the response. This is no longer the case. As an easy fix
> one could change thenAccept into thenAcceptAsync in the
> MiniDispatcher#cancelJob. But this only solves symptoms. Maybe we should
> think about allowing not only StatusHandler to close asynchronously. At the
> moment we say that all other handler shut down immediately (see
> AbstractHandler#closeHandlerAsync). But the problem with this change would
> be that all handler would become stateful because they would need to
> remember whether a request is currently ongoing or not.
>
> Cheers,
> Till
>
> On Tue, Mar 17, 2020 at 9:01 AM DONG, Weike <ky...@connect.hku.hk>
> wrote:
>
>> Hi Tison & Till and all,
>>
>> I have uploaded the client, taskmanager and jobmanager log to Gist (
>> https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f), and I
>> can reproduce this bug every time when trying to cancel Flink 1.10 jobs on
>> YARN.
>>
>> Besides, in earlier Flink versions like 1.9, the REST API for *cancelling
>> job with a savepoint *sometimes throws exceptions to the client side due
>> to
>> early shutdown of the server, even though the savepoint was successfully
>> completed by reviewing the log, however when using the newly introduced
>> *stop* API, that bug disappeared, however, *cancel* API seems to be buggy
>> now.
>>
>> Best,
>> Weike
>>
>> On Tue, Mar 17, 2020 at 10:17 AM tison <wa...@gmail.com> wrote:
>>
>> > edit: previously after the cancellation we have a longer call chain to
>> > #jobReachedGloballyTerminalState which does the archive job & JM
>> graceful
>> > showdown, which might take some time so that ...
>> >
>> > Best,
>> > tison.
>> >
>> >
>> > tison <wa...@gmail.com> 于2020年3月17日周二 上午10:13写道:
>> >
>> >> Hi Weike & Till,
>> >>
>> >> I agree with Till and it is also the analysis from my side. However, it
>> >> seems even if we don't have FLINK-15116, it is still possible that we
>> >> complete the cancel future but the cluster got shutdown before it
>> properly
>> >> delivered the response.
>> >>
>> >> There is one thing strange that this behavior almost reproducible, it
>> >> should be a possible order but not always. Maybe previous we have to
>> >> firstly cancel the job which has a long call chain so that it happens
>> we
>> >> have enough time to delivered the response.
>> >>
>> >> But the resolution looks like we introduce some
>> >> synchronization/finalization logics that clear these outstanding future
>> >> with best effort before the cluster(RestServer) down.
>> >>
>> >> Best,
>> >> tison.
>> >>
>> >>
>> >> Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 上午4:12写道:
>> >>
>> >>> Hi Weike,
>> >>>
>> >>> could you share the complete logs with us? Attachments are being
>> >>> filtered out by the Apache mail server but it works if you upload the
>> logs
>> >>> somewhere (e.g. https://gist.github.com/) and then share the link
>> with
>> >>> us. Ideally you run the cluster with DEBUG log settings.
>> >>>
>> >>> I assume that you are running Flink 1.10, right?
>> >>>
>> >>> My suspicion is that this behaviour has been introduced with
>> FLINK-15116
>> >>> [1]. It looks as if we complete the shutdown future in
>> >>> MiniDispatcher#cancelJob before we return the response to the
>> >>> RestClusterClient. My guess is that this triggers the shutdown of the
>> >>> RestServer which then is not able to serve the response to the
>> client. I'm
>> >>> pulling in Aljoscha and Tison who introduced this change. They might
>> be
>> >>> able to verify my theory and propose a solution for it.
>> >>>
>> >>> [1] https://issues.apache.org/jira/browse/FLINK-15116
>> >>>
>> >>> Cheers,
>> >>> Till
>> >>>
>> >>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <ky...@connect.hku.hk>
>> >>> wrote:
>> >>>
>> >>>> Hi Yangze and all,
>> >>>>
>> >>>> I have tried numerous times, and this behavior persists.
>> >>>>
>> >>>> Below is the tail log of taskmanager.log:
>> >>>>
>> >>>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
>> >>>>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  -
>> Free slot
>> >>>> TaskSlot(index:0, state:ACTIVE, resource profile:
>> >>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
>> >>>> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
>> >>>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
>> >>>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>> >>>> d0a674795be98bd2574d9ea3286801cb).
>> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>> >>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
>> >>>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>> >>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
>> JobManager
>> >>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>> >>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
>> JobManager
>> >>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>> >>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot
>> reconnect
>> >>>> to job d0a674795be98bd2574d9ea3286801cb because it is not registered.
>> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>> >>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>> >>>> SIGTERM. Shutting down as requested.
>> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>> >>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>> >>>> SIGTERM. Shutting down as requested.
>> >>>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>> >>>>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down
>> BLOB
>> >>>> cache
>> >>>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle
>> shutdown
>> >>>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl
>> -
>> >>>> FileChannelManager removed spill file directory
>> >>>>
>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
>> >>>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager shutdown
>> >>>> hook] INFO
>> >>>>  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager
>> -
>> >>>> Shutting down TaskExecutorLocalStateStoresManager.
>> >>>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>> >>>>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down
>> BLOB
>> >>>> cache
>> >>>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook]
>> INFO
>> >>>>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>> >>>> FileChannelManager removed spill file directory
>> >>>>
>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
>> >>>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>> >>>>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
>> >>>> directory
>> >>>>
>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>> >>>>
>> >>>> As the tail log of jobmanager.log is kind of lengthy, I have attached
>> >>>> it in this mail.
>> >>>>
>> >>>> From what I have seen, the TaskManager and JobManager shut down by
>> >>>> themselves, however, I have noticed some Netty exceptions (from the
>> stack
>> >>>> trace, it is part of the REST handler) like:
>> >>>>
>> >>>> ERROR
>> >>>>
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>> >>>>  - Failed to submit a listener notification task. Event loop shut
>> down?
>> >>>> java.util.concurrent.RejectedExecutionException: event executor
>> >>>> terminated
>> >>>>
>> >>>> Thus I suppose that these exceptions might be the actual cause of
>> >>>> premature termination of the REST server, and I am still looking
>> into the
>> >>>> real cause of this.
>> >>>>
>> >>>> Best,
>> >>>> Weike
>> >>>>
>> >>>> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com>
>> wrote:
>> >>>>
>> >>>>> Would you mind to share more information about why the task executor
>> >>>>> is killed? If it is killed by Yarn, you might get such info in Yarn
>> >>>>> NM/RM logs.
>> >>>>>
>> >>>>> Best,
>> >>>>> Yangze Guo
>> >>>>>
>> >>>>> Best,
>> >>>>> Yangze Guo
>> >>>>>
>> >>>>>
>> >>>>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <
>> kyledong@connect.hku.hk>
>> >>>>> wrote:
>> >>>>> >
>> >>>>> > Hi,
>> >>>>> >
>> >>>>> > Recently I have encountered a strange behavior of Flink on YARN,
>> >>>>> which is that when I try to cancel a Flink job running in per-job
>> mode on
>> >>>>> YARN using commands like
>> >>>>> >
>> >>>>> > "cancel -m yarn-cluster -yid application_1559388106022_9412
>> >>>>> ed7e2e0ab0a7316c1b65df6047bc6aae"
>> >>>>> >
>> >>>>> > the client happily found and connected to ResourceManager and then
>> >>>>> stucks at
>> >>>>> > Found Web Interface 172.28.28.3:50099 of application
>> >>>>> 'application_1559388106022_9412'.
>> >>>>> >
>> >>>>> > And after one minute, an exception is thrown at the client side:
>> >>>>> > Caused by: org.apache.flink.util.FlinkException: Could not cancel
>> >>>>> job ed7e2e0ab0a7316c1b65df6047bc6aae.
>> >>>>> >     at
>> >>>>>
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>> >>>>> >     at
>> >>>>>
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>> >>>>> >     at
>> >>>>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>> >>>>> >     at
>> >>>>>
>> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>> >>>>> >     at
>> >>>>>
>> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>> >>>>> >     at java.security.AccessController.doPrivileged(Native Method)
>> >>>>> >     at javax.security.auth.Subject.doAs(Subject.java:422)
>> >>>>> >     at
>> >>>>>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>> >>>>> >     ... 20 more
>> >>>>> > Caused by: java.util.concurrent.TimeoutException
>> >>>>> >     at
>> >>>>>
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> >>>>> >     at
>> >>>>>
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> >>>>> >     at
>> >>>>>
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>> >>>>> >     ... 27 more
>> >>>>> >
>> >>>>> > Then I discovered that the YARN app has already terminated with
>> >>>>> FINISHED state and KILLED final status, like below.
>> >>>>> >
>> >>>>> > And after digging into the log of this finished YARN app, I have
>> >>>>> found that TaskManager had already received the SIGTERM signal and
>> >>>>> terminated gracefully.
>> >>>>> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL
>> 15:
>> >>>>> SIGTERM. Shutting down as requested.
>> >>>>> >
>> >>>>> > Also, the log of JobManager shows that it terminated with exit
>> code
>> >>>>> 0.
>> >>>>> > Terminating cluster entrypoint process YarnJobClusterEntrypoint
>> with
>> >>>>> exit code 0
>> >>>>> >
>> >>>>> > However, the JobManager did not return anything to the client
>> before
>> >>>>> its shutdown, which is different from previous versions (like Flink
>> 1.9).
>> >>>>> >
>> >>>>> > I wonder if this is a new bug on the flink-clients or flink-yarn
>> >>>>> module?
>> >>>>> >
>> >>>>> > Thank you : )
>> >>>>> >
>> >>>>> > Sincerely,
>> >>>>> > Weike
>> >>>>>
>> >>>>
>>
>

Re: Flink YARN app terminated before the client receives the result

Posted by tison <wa...@gmail.com>.
JIRA created as https://jira.apache.org/jira/browse/FLINK-16637

Best,
tison.


Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 下午5:57写道:

>  @Tison could you create an issue to track the problem. Please also link
> the uploaded log file for further debugging.
>
> I think the reason why it worked in Flink 1.9 could have been that we had
> a async callback in the longer chain which broke the flow of execution and
> allowed to send the response. This is no longer the case. As an easy fix
> one could change thenAccept into thenAcceptAsync in the
> MiniDispatcher#cancelJob. But this only solves symptoms. Maybe we should
> think about allowing not only StatusHandler to close asynchronously. At the
> moment we say that all other handler shut down immediately (see
> AbstractHandler#closeHandlerAsync). But the problem with this change would
> be that all handler would become stateful because they would need to
> remember whether a request is currently ongoing or not.
>
> Cheers,
> Till
>
> On Tue, Mar 17, 2020 at 9:01 AM DONG, Weike <ky...@connect.hku.hk>
> wrote:
>
>> Hi Tison & Till and all,
>>
>> I have uploaded the client, taskmanager and jobmanager log to Gist (
>> https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f), and I
>> can reproduce this bug every time when trying to cancel Flink 1.10 jobs on
>> YARN.
>>
>> Besides, in earlier Flink versions like 1.9, the REST API for *cancelling
>> job with a savepoint *sometimes throws exceptions to the client side due
>> to
>> early shutdown of the server, even though the savepoint was successfully
>> completed by reviewing the log, however when using the newly introduced
>> *stop* API, that bug disappeared, however, *cancel* API seems to be buggy
>> now.
>>
>> Best,
>> Weike
>>
>> On Tue, Mar 17, 2020 at 10:17 AM tison <wa...@gmail.com> wrote:
>>
>> > edit: previously after the cancellation we have a longer call chain to
>> > #jobReachedGloballyTerminalState which does the archive job & JM
>> graceful
>> > showdown, which might take some time so that ...
>> >
>> > Best,
>> > tison.
>> >
>> >
>> > tison <wa...@gmail.com> 于2020年3月17日周二 上午10:13写道:
>> >
>> >> Hi Weike & Till,
>> >>
>> >> I agree with Till and it is also the analysis from my side. However, it
>> >> seems even if we don't have FLINK-15116, it is still possible that we
>> >> complete the cancel future but the cluster got shutdown before it
>> properly
>> >> delivered the response.
>> >>
>> >> There is one thing strange that this behavior almost reproducible, it
>> >> should be a possible order but not always. Maybe previous we have to
>> >> firstly cancel the job which has a long call chain so that it happens
>> we
>> >> have enough time to delivered the response.
>> >>
>> >> But the resolution looks like we introduce some
>> >> synchronization/finalization logics that clear these outstanding future
>> >> with best effort before the cluster(RestServer) down.
>> >>
>> >> Best,
>> >> tison.
>> >>
>> >>
>> >> Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 上午4:12写道:
>> >>
>> >>> Hi Weike,
>> >>>
>> >>> could you share the complete logs with us? Attachments are being
>> >>> filtered out by the Apache mail server but it works if you upload the
>> logs
>> >>> somewhere (e.g. https://gist.github.com/) and then share the link
>> with
>> >>> us. Ideally you run the cluster with DEBUG log settings.
>> >>>
>> >>> I assume that you are running Flink 1.10, right?
>> >>>
>> >>> My suspicion is that this behaviour has been introduced with
>> FLINK-15116
>> >>> [1]. It looks as if we complete the shutdown future in
>> >>> MiniDispatcher#cancelJob before we return the response to the
>> >>> RestClusterClient. My guess is that this triggers the shutdown of the
>> >>> RestServer which then is not able to serve the response to the
>> client. I'm
>> >>> pulling in Aljoscha and Tison who introduced this change. They might
>> be
>> >>> able to verify my theory and propose a solution for it.
>> >>>
>> >>> [1] https://issues.apache.org/jira/browse/FLINK-15116
>> >>>
>> >>> Cheers,
>> >>> Till
>> >>>
>> >>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <ky...@connect.hku.hk>
>> >>> wrote:
>> >>>
>> >>>> Hi Yangze and all,
>> >>>>
>> >>>> I have tried numerous times, and this behavior persists.
>> >>>>
>> >>>> Below is the tail log of taskmanager.log:
>> >>>>
>> >>>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
>> >>>>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  -
>> Free slot
>> >>>> TaskSlot(index:0, state:ACTIVE, resource profile:
>> >>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
>> >>>> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
>> >>>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
>> >>>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>> >>>> d0a674795be98bd2574d9ea3286801cb).
>> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>> >>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
>> >>>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>> >>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
>> JobManager
>> >>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>> >>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
>> JobManager
>> >>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>> >>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot
>> reconnect
>> >>>> to job d0a674795be98bd2574d9ea3286801cb because it is not registered.
>> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>> >>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>> >>>> SIGTERM. Shutting down as requested.
>> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>> >>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>> >>>> SIGTERM. Shutting down as requested.
>> >>>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>> >>>>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down
>> BLOB
>> >>>> cache
>> >>>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle
>> shutdown
>> >>>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl
>> -
>> >>>> FileChannelManager removed spill file directory
>> >>>>
>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
>> >>>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager shutdown
>> >>>> hook] INFO
>> >>>>  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager
>> -
>> >>>> Shutting down TaskExecutorLocalStateStoresManager.
>> >>>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>> >>>>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down
>> BLOB
>> >>>> cache
>> >>>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook]
>> INFO
>> >>>>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>> >>>> FileChannelManager removed spill file directory
>> >>>>
>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
>> >>>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>> >>>>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
>> >>>> directory
>> >>>>
>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>> >>>>
>> >>>> As the tail log of jobmanager.log is kind of lengthy, I have attached
>> >>>> it in this mail.
>> >>>>
>> >>>> From what I have seen, the TaskManager and JobManager shut down by
>> >>>> themselves, however, I have noticed some Netty exceptions (from the
>> stack
>> >>>> trace, it is part of the REST handler) like:
>> >>>>
>> >>>> ERROR
>> >>>>
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>> >>>>  - Failed to submit a listener notification task. Event loop shut
>> down?
>> >>>> java.util.concurrent.RejectedExecutionException: event executor
>> >>>> terminated
>> >>>>
>> >>>> Thus I suppose that these exceptions might be the actual cause of
>> >>>> premature termination of the REST server, and I am still looking
>> into the
>> >>>> real cause of this.
>> >>>>
>> >>>> Best,
>> >>>> Weike
>> >>>>
>> >>>> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com>
>> wrote:
>> >>>>
>> >>>>> Would you mind to share more information about why the task executor
>> >>>>> is killed? If it is killed by Yarn, you might get such info in Yarn
>> >>>>> NM/RM logs.
>> >>>>>
>> >>>>> Best,
>> >>>>> Yangze Guo
>> >>>>>
>> >>>>> Best,
>> >>>>> Yangze Guo
>> >>>>>
>> >>>>>
>> >>>>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <
>> kyledong@connect.hku.hk>
>> >>>>> wrote:
>> >>>>> >
>> >>>>> > Hi,
>> >>>>> >
>> >>>>> > Recently I have encountered a strange behavior of Flink on YARN,
>> >>>>> which is that when I try to cancel a Flink job running in per-job
>> mode on
>> >>>>> YARN using commands like
>> >>>>> >
>> >>>>> > "cancel -m yarn-cluster -yid application_1559388106022_9412
>> >>>>> ed7e2e0ab0a7316c1b65df6047bc6aae"
>> >>>>> >
>> >>>>> > the client happily found and connected to ResourceManager and then
>> >>>>> stucks at
>> >>>>> > Found Web Interface 172.28.28.3:50099 of application
>> >>>>> 'application_1559388106022_9412'.
>> >>>>> >
>> >>>>> > And after one minute, an exception is thrown at the client side:
>> >>>>> > Caused by: org.apache.flink.util.FlinkException: Could not cancel
>> >>>>> job ed7e2e0ab0a7316c1b65df6047bc6aae.
>> >>>>> >     at
>> >>>>>
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>> >>>>> >     at
>> >>>>>
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>> >>>>> >     at
>> >>>>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>> >>>>> >     at
>> >>>>>
>> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>> >>>>> >     at
>> >>>>>
>> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>> >>>>> >     at java.security.AccessController.doPrivileged(Native Method)
>> >>>>> >     at javax.security.auth.Subject.doAs(Subject.java:422)
>> >>>>> >     at
>> >>>>>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>> >>>>> >     ... 20 more
>> >>>>> > Caused by: java.util.concurrent.TimeoutException
>> >>>>> >     at
>> >>>>>
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> >>>>> >     at
>> >>>>>
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> >>>>> >     at
>> >>>>>
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>> >>>>> >     ... 27 more
>> >>>>> >
>> >>>>> > Then I discovered that the YARN app has already terminated with
>> >>>>> FINISHED state and KILLED final status, like below.
>> >>>>> >
>> >>>>> > And after digging into the log of this finished YARN app, I have
>> >>>>> found that TaskManager had already received the SIGTERM signal and
>> >>>>> terminated gracefully.
>> >>>>> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL
>> 15:
>> >>>>> SIGTERM. Shutting down as requested.
>> >>>>> >
>> >>>>> > Also, the log of JobManager shows that it terminated with exit
>> code
>> >>>>> 0.
>> >>>>> > Terminating cluster entrypoint process YarnJobClusterEntrypoint
>> with
>> >>>>> exit code 0
>> >>>>> >
>> >>>>> > However, the JobManager did not return anything to the client
>> before
>> >>>>> its shutdown, which is different from previous versions (like Flink
>> 1.9).
>> >>>>> >
>> >>>>> > I wonder if this is a new bug on the flink-clients or flink-yarn
>> >>>>> module?
>> >>>>> >
>> >>>>> > Thank you : )
>> >>>>> >
>> >>>>> > Sincerely,
>> >>>>> > Weike
>> >>>>>
>> >>>>
>>
>

Re: Flink YARN app terminated before the client receives the result

Posted by Till Rohrmann <tr...@apache.org>.
 @Tison could you create an issue to track the problem. Please also link
the uploaded log file for further debugging.

I think the reason why it worked in Flink 1.9 could have been that we had a
async callback in the longer chain which broke the flow of execution and
allowed to send the response. This is no longer the case. As an easy fix
one could change thenAccept into thenAcceptAsync in the
MiniDispatcher#cancelJob. But this only solves symptoms. Maybe we should
think about allowing not only StatusHandler to close asynchronously. At the
moment we say that all other handler shut down immediately (see
AbstractHandler#closeHandlerAsync). But the problem with this change would
be that all handler would become stateful because they would need to
remember whether a request is currently ongoing or not.

Cheers,
Till

On Tue, Mar 17, 2020 at 9:01 AM DONG, Weike <ky...@connect.hku.hk> wrote:

> Hi Tison & Till and all,
>
> I have uploaded the client, taskmanager and jobmanager log to Gist (
> https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f), and I
> can reproduce this bug every time when trying to cancel Flink 1.10 jobs on
> YARN.
>
> Besides, in earlier Flink versions like 1.9, the REST API for *cancelling
> job with a savepoint *sometimes throws exceptions to the client side due to
> early shutdown of the server, even though the savepoint was successfully
> completed by reviewing the log, however when using the newly introduced
> *stop* API, that bug disappeared, however, *cancel* API seems to be buggy
> now.
>
> Best,
> Weike
>
> On Tue, Mar 17, 2020 at 10:17 AM tison <wa...@gmail.com> wrote:
>
> > edit: previously after the cancellation we have a longer call chain to
> > #jobReachedGloballyTerminalState which does the archive job & JM graceful
> > showdown, which might take some time so that ...
> >
> > Best,
> > tison.
> >
> >
> > tison <wa...@gmail.com> 于2020年3月17日周二 上午10:13写道:
> >
> >> Hi Weike & Till,
> >>
> >> I agree with Till and it is also the analysis from my side. However, it
> >> seems even if we don't have FLINK-15116, it is still possible that we
> >> complete the cancel future but the cluster got shutdown before it
> properly
> >> delivered the response.
> >>
> >> There is one thing strange that this behavior almost reproducible, it
> >> should be a possible order but not always. Maybe previous we have to
> >> firstly cancel the job which has a long call chain so that it happens we
> >> have enough time to delivered the response.
> >>
> >> But the resolution looks like we introduce some
> >> synchronization/finalization logics that clear these outstanding future
> >> with best effort before the cluster(RestServer) down.
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 上午4:12写道:
> >>
> >>> Hi Weike,
> >>>
> >>> could you share the complete logs with us? Attachments are being
> >>> filtered out by the Apache mail server but it works if you upload the
> logs
> >>> somewhere (e.g. https://gist.github.com/) and then share the link with
> >>> us. Ideally you run the cluster with DEBUG log settings.
> >>>
> >>> I assume that you are running Flink 1.10, right?
> >>>
> >>> My suspicion is that this behaviour has been introduced with
> FLINK-15116
> >>> [1]. It looks as if we complete the shutdown future in
> >>> MiniDispatcher#cancelJob before we return the response to the
> >>> RestClusterClient. My guess is that this triggers the shutdown of the
> >>> RestServer which then is not able to serve the response to the client.
> I'm
> >>> pulling in Aljoscha and Tison who introduced this change. They might be
> >>> able to verify my theory and propose a solution for it.
> >>>
> >>> [1] https://issues.apache.org/jira/browse/FLINK-15116
> >>>
> >>> Cheers,
> >>> Till
> >>>
> >>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <ky...@connect.hku.hk>
> >>> wrote:
> >>>
> >>>> Hi Yangze and all,
> >>>>
> >>>> I have tried numerous times, and this behavior persists.
> >>>>
> >>>> Below is the tail log of taskmanager.log:
> >>>>
> >>>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
> >>>>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Free
> slot
> >>>> TaskSlot(index:0, state:ACTIVE, resource profile:
> >>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
> >>>> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
> >>>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
> >>>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
> >>>> d0a674795be98bd2574d9ea3286801cb).
> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
> >>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
> >>>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
> >>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
> JobManager
> >>>> connection for job d0a674795be98bd2574d9ea3286801cb.
> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
> >>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
> JobManager
> >>>> connection for job d0a674795be98bd2574d9ea3286801cb.
> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
> >>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot
> reconnect
> >>>> to job d0a674795be98bd2574d9ea3286801cb because it is not registered.
> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
> >>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
> >>>> SIGTERM. Shutting down as requested.
> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
> >>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
> >>>> SIGTERM. Shutting down as requested.
> >>>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
> >>>>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down
> BLOB
> >>>> cache
> >>>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle shutdown
> >>>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl
> -
> >>>> FileChannelManager removed spill file directory
> >>>>
> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
> >>>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager shutdown
> >>>> hook] INFO
> >>>>  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
> >>>> Shutting down TaskExecutorLocalStateStoresManager.
> >>>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
> >>>>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down
> BLOB
> >>>> cache
> >>>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook] INFO
> >>>>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
> >>>> FileChannelManager removed spill file directory
> >>>>
> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
> >>>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
> >>>>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
> >>>> directory
> >>>>
> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
> >>>>
> >>>> As the tail log of jobmanager.log is kind of lengthy, I have attached
> >>>> it in this mail.
> >>>>
> >>>> From what I have seen, the TaskManager and JobManager shut down by
> >>>> themselves, however, I have noticed some Netty exceptions (from the
> stack
> >>>> trace, it is part of the REST handler) like:
> >>>>
> >>>> ERROR
> >>>>
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
> >>>>  - Failed to submit a listener notification task. Event loop shut
> down?
> >>>> java.util.concurrent.RejectedExecutionException: event executor
> >>>> terminated
> >>>>
> >>>> Thus I suppose that these exceptions might be the actual cause of
> >>>> premature termination of the REST server, and I am still looking into
> the
> >>>> real cause of this.
> >>>>
> >>>> Best,
> >>>> Weike
> >>>>
> >>>> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com>
> wrote:
> >>>>
> >>>>> Would you mind to share more information about why the task executor
> >>>>> is killed? If it is killed by Yarn, you might get such info in Yarn
> >>>>> NM/RM logs.
> >>>>>
> >>>>> Best,
> >>>>> Yangze Guo
> >>>>>
> >>>>> Best,
> >>>>> Yangze Guo
> >>>>>
> >>>>>
> >>>>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <
> kyledong@connect.hku.hk>
> >>>>> wrote:
> >>>>> >
> >>>>> > Hi,
> >>>>> >
> >>>>> > Recently I have encountered a strange behavior of Flink on YARN,
> >>>>> which is that when I try to cancel a Flink job running in per-job
> mode on
> >>>>> YARN using commands like
> >>>>> >
> >>>>> > "cancel -m yarn-cluster -yid application_1559388106022_9412
> >>>>> ed7e2e0ab0a7316c1b65df6047bc6aae"
> >>>>> >
> >>>>> > the client happily found and connected to ResourceManager and then
> >>>>> stucks at
> >>>>> > Found Web Interface 172.28.28.3:50099 of application
> >>>>> 'application_1559388106022_9412'.
> >>>>> >
> >>>>> > And after one minute, an exception is thrown at the client side:
> >>>>> > Caused by: org.apache.flink.util.FlinkException: Could not cancel
> >>>>> job ed7e2e0ab0a7316c1b65df6047bc6aae.
> >>>>> >     at
> >>>>>
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
> >>>>> >     at
> >>>>>
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
> >>>>> >     at
> >>>>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
> >>>>> >     at
> >>>>>
> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
> >>>>> >     at
> >>>>>
> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
> >>>>> >     at java.security.AccessController.doPrivileged(Native Method)
> >>>>> >     at javax.security.auth.Subject.doAs(Subject.java:422)
> >>>>> >     at
> >>>>>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> >>>>> >     ... 20 more
> >>>>> > Caused by: java.util.concurrent.TimeoutException
> >>>>> >     at
> >>>>>
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> >>>>> >     at
> >>>>>
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> >>>>> >     at
> >>>>>
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
> >>>>> >     ... 27 more
> >>>>> >
> >>>>> > Then I discovered that the YARN app has already terminated with
> >>>>> FINISHED state and KILLED final status, like below.
> >>>>> >
> >>>>> > And after digging into the log of this finished YARN app, I have
> >>>>> found that TaskManager had already received the SIGTERM signal and
> >>>>> terminated gracefully.
> >>>>> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
> >>>>> SIGTERM. Shutting down as requested.
> >>>>> >
> >>>>> > Also, the log of JobManager shows that it terminated with exit code
> >>>>> 0.
> >>>>> > Terminating cluster entrypoint process YarnJobClusterEntrypoint
> with
> >>>>> exit code 0
> >>>>> >
> >>>>> > However, the JobManager did not return anything to the client
> before
> >>>>> its shutdown, which is different from previous versions (like Flink
> 1.9).
> >>>>> >
> >>>>> > I wonder if this is a new bug on the flink-clients or flink-yarn
> >>>>> module?
> >>>>> >
> >>>>> > Thank you : )
> >>>>> >
> >>>>> > Sincerely,
> >>>>> > Weike
> >>>>>
> >>>>
>

Re: Flink YARN app terminated before the client receives the result

Posted by Till Rohrmann <tr...@apache.org>.
 @Tison could you create an issue to track the problem. Please also link
the uploaded log file for further debugging.

I think the reason why it worked in Flink 1.9 could have been that we had a
async callback in the longer chain which broke the flow of execution and
allowed to send the response. This is no longer the case. As an easy fix
one could change thenAccept into thenAcceptAsync in the
MiniDispatcher#cancelJob. But this only solves symptoms. Maybe we should
think about allowing not only StatusHandler to close asynchronously. At the
moment we say that all other handler shut down immediately (see
AbstractHandler#closeHandlerAsync). But the problem with this change would
be that all handler would become stateful because they would need to
remember whether a request is currently ongoing or not.

Cheers,
Till

On Tue, Mar 17, 2020 at 9:01 AM DONG, Weike <ky...@connect.hku.hk> wrote:

> Hi Tison & Till and all,
>
> I have uploaded the client, taskmanager and jobmanager log to Gist (
> https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f), and I
> can reproduce this bug every time when trying to cancel Flink 1.10 jobs on
> YARN.
>
> Besides, in earlier Flink versions like 1.9, the REST API for *cancelling
> job with a savepoint *sometimes throws exceptions to the client side due to
> early shutdown of the server, even though the savepoint was successfully
> completed by reviewing the log, however when using the newly introduced
> *stop* API, that bug disappeared, however, *cancel* API seems to be buggy
> now.
>
> Best,
> Weike
>
> On Tue, Mar 17, 2020 at 10:17 AM tison <wa...@gmail.com> wrote:
>
> > edit: previously after the cancellation we have a longer call chain to
> > #jobReachedGloballyTerminalState which does the archive job & JM graceful
> > showdown, which might take some time so that ...
> >
> > Best,
> > tison.
> >
> >
> > tison <wa...@gmail.com> 于2020年3月17日周二 上午10:13写道:
> >
> >> Hi Weike & Till,
> >>
> >> I agree with Till and it is also the analysis from my side. However, it
> >> seems even if we don't have FLINK-15116, it is still possible that we
> >> complete the cancel future but the cluster got shutdown before it
> properly
> >> delivered the response.
> >>
> >> There is one thing strange that this behavior almost reproducible, it
> >> should be a possible order but not always. Maybe previous we have to
> >> firstly cancel the job which has a long call chain so that it happens we
> >> have enough time to delivered the response.
> >>
> >> But the resolution looks like we introduce some
> >> synchronization/finalization logics that clear these outstanding future
> >> with best effort before the cluster(RestServer) down.
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 上午4:12写道:
> >>
> >>> Hi Weike,
> >>>
> >>> could you share the complete logs with us? Attachments are being
> >>> filtered out by the Apache mail server but it works if you upload the
> logs
> >>> somewhere (e.g. https://gist.github.com/) and then share the link with
> >>> us. Ideally you run the cluster with DEBUG log settings.
> >>>
> >>> I assume that you are running Flink 1.10, right?
> >>>
> >>> My suspicion is that this behaviour has been introduced with
> FLINK-15116
> >>> [1]. It looks as if we complete the shutdown future in
> >>> MiniDispatcher#cancelJob before we return the response to the
> >>> RestClusterClient. My guess is that this triggers the shutdown of the
> >>> RestServer which then is not able to serve the response to the client.
> I'm
> >>> pulling in Aljoscha and Tison who introduced this change. They might be
> >>> able to verify my theory and propose a solution for it.
> >>>
> >>> [1] https://issues.apache.org/jira/browse/FLINK-15116
> >>>
> >>> Cheers,
> >>> Till
> >>>
> >>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <ky...@connect.hku.hk>
> >>> wrote:
> >>>
> >>>> Hi Yangze and all,
> >>>>
> >>>> I have tried numerous times, and this behavior persists.
> >>>>
> >>>> Below is the tail log of taskmanager.log:
> >>>>
> >>>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
> >>>>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Free
> slot
> >>>> TaskSlot(index:0, state:ACTIVE, resource profile:
> >>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
> >>>> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
> >>>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
> >>>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
> >>>> d0a674795be98bd2574d9ea3286801cb).
> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
> >>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
> >>>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
> >>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
> >>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
> JobManager
> >>>> connection for job d0a674795be98bd2574d9ea3286801cb.
> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
> >>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close
> JobManager
> >>>> connection for job d0a674795be98bd2574d9ea3286801cb.
> >>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
> >>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot
> reconnect
> >>>> to job d0a674795be98bd2574d9ea3286801cb because it is not registered.
> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
> >>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
> >>>> SIGTERM. Shutting down as requested.
> >>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
> >>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
> >>>> SIGTERM. Shutting down as requested.
> >>>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
> >>>>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down
> BLOB
> >>>> cache
> >>>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle shutdown
> >>>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl
> -
> >>>> FileChannelManager removed spill file directory
> >>>>
> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
> >>>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager shutdown
> >>>> hook] INFO
> >>>>  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
> >>>> Shutting down TaskExecutorLocalStateStoresManager.
> >>>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
> >>>>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down
> BLOB
> >>>> cache
> >>>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook] INFO
> >>>>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
> >>>> FileChannelManager removed spill file directory
> >>>>
> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
> >>>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
> >>>>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
> >>>> directory
> >>>>
> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
> >>>>
> >>>> As the tail log of jobmanager.log is kind of lengthy, I have attached
> >>>> it in this mail.
> >>>>
> >>>> From what I have seen, the TaskManager and JobManager shut down by
> >>>> themselves, however, I have noticed some Netty exceptions (from the
> stack
> >>>> trace, it is part of the REST handler) like:
> >>>>
> >>>> ERROR
> >>>>
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
> >>>>  - Failed to submit a listener notification task. Event loop shut
> down?
> >>>> java.util.concurrent.RejectedExecutionException: event executor
> >>>> terminated
> >>>>
> >>>> Thus I suppose that these exceptions might be the actual cause of
> >>>> premature termination of the REST server, and I am still looking into
> the
> >>>> real cause of this.
> >>>>
> >>>> Best,
> >>>> Weike
> >>>>
> >>>> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com>
> wrote:
> >>>>
> >>>>> Would you mind to share more information about why the task executor
> >>>>> is killed? If it is killed by Yarn, you might get such info in Yarn
> >>>>> NM/RM logs.
> >>>>>
> >>>>> Best,
> >>>>> Yangze Guo
> >>>>>
> >>>>> Best,
> >>>>> Yangze Guo
> >>>>>
> >>>>>
> >>>>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <
> kyledong@connect.hku.hk>
> >>>>> wrote:
> >>>>> >
> >>>>> > Hi,
> >>>>> >
> >>>>> > Recently I have encountered a strange behavior of Flink on YARN,
> >>>>> which is that when I try to cancel a Flink job running in per-job
> mode on
> >>>>> YARN using commands like
> >>>>> >
> >>>>> > "cancel -m yarn-cluster -yid application_1559388106022_9412
> >>>>> ed7e2e0ab0a7316c1b65df6047bc6aae"
> >>>>> >
> >>>>> > the client happily found and connected to ResourceManager and then
> >>>>> stucks at
> >>>>> > Found Web Interface 172.28.28.3:50099 of application
> >>>>> 'application_1559388106022_9412'.
> >>>>> >
> >>>>> > And after one minute, an exception is thrown at the client side:
> >>>>> > Caused by: org.apache.flink.util.FlinkException: Could not cancel
> >>>>> job ed7e2e0ab0a7316c1b65df6047bc6aae.
> >>>>> >     at
> >>>>>
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
> >>>>> >     at
> >>>>>
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
> >>>>> >     at
> >>>>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
> >>>>> >     at
> >>>>>
> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
> >>>>> >     at
> >>>>>
> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
> >>>>> >     at java.security.AccessController.doPrivileged(Native Method)
> >>>>> >     at javax.security.auth.Subject.doAs(Subject.java:422)
> >>>>> >     at
> >>>>>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> >>>>> >     ... 20 more
> >>>>> > Caused by: java.util.concurrent.TimeoutException
> >>>>> >     at
> >>>>>
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> >>>>> >     at
> >>>>>
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> >>>>> >     at
> >>>>>
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
> >>>>> >     ... 27 more
> >>>>> >
> >>>>> > Then I discovered that the YARN app has already terminated with
> >>>>> FINISHED state and KILLED final status, like below.
> >>>>> >
> >>>>> > And after digging into the log of this finished YARN app, I have
> >>>>> found that TaskManager had already received the SIGTERM signal and
> >>>>> terminated gracefully.
> >>>>> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
> >>>>> SIGTERM. Shutting down as requested.
> >>>>> >
> >>>>> > Also, the log of JobManager shows that it terminated with exit code
> >>>>> 0.
> >>>>> > Terminating cluster entrypoint process YarnJobClusterEntrypoint
> with
> >>>>> exit code 0
> >>>>> >
> >>>>> > However, the JobManager did not return anything to the client
> before
> >>>>> its shutdown, which is different from previous versions (like Flink
> 1.9).
> >>>>> >
> >>>>> > I wonder if this is a new bug on the flink-clients or flink-yarn
> >>>>> module?
> >>>>> >
> >>>>> > Thank you : )
> >>>>> >
> >>>>> > Sincerely,
> >>>>> > Weike
> >>>>>
> >>>>
>

Re: Flink YARN app terminated before the client receives the result

Posted by "DONG, Weike" <ky...@connect.hku.hk>.
Hi Tison & Till and all,

I have uploaded the client, taskmanager and jobmanager log to Gist (
https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f), and I
can reproduce this bug every time when trying to cancel Flink 1.10 jobs on
YARN.

Besides, in earlier Flink versions like 1.9, the REST API for *cancelling
job with a savepoint *sometimes throws exceptions to the client side due to
early shutdown of the server, even though the savepoint was successfully
completed by reviewing the log, however when using the newly introduced
*stop* API, that bug disappeared, however, *cancel* API seems to be buggy
now.

Best,
Weike

On Tue, Mar 17, 2020 at 10:17 AM tison <wa...@gmail.com> wrote:

> edit: previously after the cancellation we have a longer call chain to
> #jobReachedGloballyTerminalState which does the archive job & JM graceful
> showdown, which might take some time so that ...
>
> Best,
> tison.
>
>
> tison <wa...@gmail.com> 于2020年3月17日周二 上午10:13写道:
>
>> Hi Weike & Till,
>>
>> I agree with Till and it is also the analysis from my side. However, it
>> seems even if we don't have FLINK-15116, it is still possible that we
>> complete the cancel future but the cluster got shutdown before it properly
>> delivered the response.
>>
>> There is one thing strange that this behavior almost reproducible, it
>> should be a possible order but not always. Maybe previous we have to
>> firstly cancel the job which has a long call chain so that it happens we
>> have enough time to delivered the response.
>>
>> But the resolution looks like we introduce some
>> synchronization/finalization logics that clear these outstanding future
>> with best effort before the cluster(RestServer) down.
>>
>> Best,
>> tison.
>>
>>
>> Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 上午4:12写道:
>>
>>> Hi Weike,
>>>
>>> could you share the complete logs with us? Attachments are being
>>> filtered out by the Apache mail server but it works if you upload the logs
>>> somewhere (e.g. https://gist.github.com/) and then share the link with
>>> us. Ideally you run the cluster with DEBUG log settings.
>>>
>>> I assume that you are running Flink 1.10, right?
>>>
>>> My suspicion is that this behaviour has been introduced with FLINK-15116
>>> [1]. It looks as if we complete the shutdown future in
>>> MiniDispatcher#cancelJob before we return the response to the
>>> RestClusterClient. My guess is that this triggers the shutdown of the
>>> RestServer which then is not able to serve the response to the client. I'm
>>> pulling in Aljoscha and Tison who introduced this change. They might be
>>> able to verify my theory and propose a solution for it.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-15116
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <ky...@connect.hku.hk>
>>> wrote:
>>>
>>>> Hi Yangze and all,
>>>>
>>>> I have tried numerous times, and this behavior persists.
>>>>
>>>> Below is the tail log of taskmanager.log:
>>>>
>>>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
>>>>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Free slot
>>>> TaskSlot(index:0, state:ACTIVE, resource profile:
>>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
>>>> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
>>>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
>>>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>>>> d0a674795be98bd2574d9ea3286801cb).
>>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
>>>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
>>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
>>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot reconnect
>>>> to job d0a674795be98bd2574d9ea3286801cb because it is not registered.
>>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>>> SIGTERM. Shutting down as requested.
>>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>>> SIGTERM. Shutting down as requested.
>>>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>>>>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down BLOB
>>>> cache
>>>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle shutdown
>>>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>>>> FileChannelManager removed spill file directory
>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
>>>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager shutdown
>>>> hook] INFO
>>>>  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>>>> Shutting down TaskExecutorLocalStateStoresManager.
>>>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>>>>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down BLOB
>>>> cache
>>>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook] INFO
>>>>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>>>> FileChannelManager removed spill file directory
>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
>>>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>>>>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
>>>> directory
>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>>>>
>>>> As the tail log of jobmanager.log is kind of lengthy, I have attached
>>>> it in this mail.
>>>>
>>>> From what I have seen, the TaskManager and JobManager shut down by
>>>> themselves, however, I have noticed some Netty exceptions (from the stack
>>>> trace, it is part of the REST handler) like:
>>>>
>>>> ERROR
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>>>  - Failed to submit a listener notification task. Event loop shut down?
>>>> java.util.concurrent.RejectedExecutionException: event executor
>>>> terminated
>>>>
>>>> Thus I suppose that these exceptions might be the actual cause of
>>>> premature termination of the REST server, and I am still looking into the
>>>> real cause of this.
>>>>
>>>> Best,
>>>> Weike
>>>>
>>>> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com> wrote:
>>>>
>>>>> Would you mind to share more information about why the task executor
>>>>> is killed? If it is killed by Yarn, you might get such info in Yarn
>>>>> NM/RM logs.
>>>>>
>>>>> Best,
>>>>> Yangze Guo
>>>>>
>>>>> Best,
>>>>> Yangze Guo
>>>>>
>>>>>
>>>>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <ky...@connect.hku.hk>
>>>>> wrote:
>>>>> >
>>>>> > Hi,
>>>>> >
>>>>> > Recently I have encountered a strange behavior of Flink on YARN,
>>>>> which is that when I try to cancel a Flink job running in per-job mode on
>>>>> YARN using commands like
>>>>> >
>>>>> > "cancel -m yarn-cluster -yid application_1559388106022_9412
>>>>> ed7e2e0ab0a7316c1b65df6047bc6aae"
>>>>> >
>>>>> > the client happily found and connected to ResourceManager and then
>>>>> stucks at
>>>>> > Found Web Interface 172.28.28.3:50099 of application
>>>>> 'application_1559388106022_9412'.
>>>>> >
>>>>> > And after one minute, an exception is thrown at the client side:
>>>>> > Caused by: org.apache.flink.util.FlinkException: Could not cancel
>>>>> job ed7e2e0ab0a7316c1b65df6047bc6aae.
>>>>> >     at
>>>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>>>>> >     at
>>>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>>>>> >     at
>>>>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>>>>> >     at
>>>>> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>>>>> >     at
>>>>> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>>>>> >     at java.security.AccessController.doPrivileged(Native Method)
>>>>> >     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>> >     at
>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>>>>> >     ... 20 more
>>>>> > Caused by: java.util.concurrent.TimeoutException
>>>>> >     at
>>>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>>>>> >     at
>>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>>>> >     at
>>>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>>>>> >     ... 27 more
>>>>> >
>>>>> > Then I discovered that the YARN app has already terminated with
>>>>> FINISHED state and KILLED final status, like below.
>>>>> >
>>>>> > And after digging into the log of this finished YARN app, I have
>>>>> found that TaskManager had already received the SIGTERM signal and
>>>>> terminated gracefully.
>>>>> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>>>> SIGTERM. Shutting down as requested.
>>>>> >
>>>>> > Also, the log of JobManager shows that it terminated with exit code
>>>>> 0.
>>>>> > Terminating cluster entrypoint process YarnJobClusterEntrypoint with
>>>>> exit code 0
>>>>> >
>>>>> > However, the JobManager did not return anything to the client before
>>>>> its shutdown, which is different from previous versions (like Flink 1.9).
>>>>> >
>>>>> > I wonder if this is a new bug on the flink-clients or flink-yarn
>>>>> module?
>>>>> >
>>>>> > Thank you : )
>>>>> >
>>>>> > Sincerely,
>>>>> > Weike
>>>>>
>>>>

Re: Flink YARN app terminated before the client receives the result

Posted by "DONG, Weike" <ky...@connect.hku.hk>.
Hi Tison & Till and all,

I have uploaded the client, taskmanager and jobmanager log to Gist (
https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f), and I
can reproduce this bug every time when trying to cancel Flink 1.10 jobs on
YARN.

Besides, in earlier Flink versions like 1.9, the REST API for *cancelling
job with a savepoint *sometimes throws exceptions to the client side due to
early shutdown of the server, even though the savepoint was successfully
completed by reviewing the log, however when using the newly introduced
*stop* API, that bug disappeared, however, *cancel* API seems to be buggy
now.

Best,
Weike

On Tue, Mar 17, 2020 at 10:17 AM tison <wa...@gmail.com> wrote:

> edit: previously after the cancellation we have a longer call chain to
> #jobReachedGloballyTerminalState which does the archive job & JM graceful
> showdown, which might take some time so that ...
>
> Best,
> tison.
>
>
> tison <wa...@gmail.com> 于2020年3月17日周二 上午10:13写道:
>
>> Hi Weike & Till,
>>
>> I agree with Till and it is also the analysis from my side. However, it
>> seems even if we don't have FLINK-15116, it is still possible that we
>> complete the cancel future but the cluster got shutdown before it properly
>> delivered the response.
>>
>> There is one thing strange that this behavior almost reproducible, it
>> should be a possible order but not always. Maybe previous we have to
>> firstly cancel the job which has a long call chain so that it happens we
>> have enough time to delivered the response.
>>
>> But the resolution looks like we introduce some
>> synchronization/finalization logics that clear these outstanding future
>> with best effort before the cluster(RestServer) down.
>>
>> Best,
>> tison.
>>
>>
>> Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 上午4:12写道:
>>
>>> Hi Weike,
>>>
>>> could you share the complete logs with us? Attachments are being
>>> filtered out by the Apache mail server but it works if you upload the logs
>>> somewhere (e.g. https://gist.github.com/) and then share the link with
>>> us. Ideally you run the cluster with DEBUG log settings.
>>>
>>> I assume that you are running Flink 1.10, right?
>>>
>>> My suspicion is that this behaviour has been introduced with FLINK-15116
>>> [1]. It looks as if we complete the shutdown future in
>>> MiniDispatcher#cancelJob before we return the response to the
>>> RestClusterClient. My guess is that this triggers the shutdown of the
>>> RestServer which then is not able to serve the response to the client. I'm
>>> pulling in Aljoscha and Tison who introduced this change. They might be
>>> able to verify my theory and propose a solution for it.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-15116
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <ky...@connect.hku.hk>
>>> wrote:
>>>
>>>> Hi Yangze and all,
>>>>
>>>> I have tried numerous times, and this behavior persists.
>>>>
>>>> Below is the tail log of taskmanager.log:
>>>>
>>>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
>>>>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Free slot
>>>> TaskSlot(index:0, state:ACTIVE, resource profile:
>>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
>>>> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
>>>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
>>>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>>>> d0a674795be98bd2574d9ea3286801cb).
>>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
>>>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
>>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
>>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot reconnect
>>>> to job d0a674795be98bd2574d9ea3286801cb because it is not registered.
>>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>>> SIGTERM. Shutting down as requested.
>>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>>> SIGTERM. Shutting down as requested.
>>>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>>>>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down BLOB
>>>> cache
>>>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle shutdown
>>>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>>>> FileChannelManager removed spill file directory
>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
>>>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager shutdown
>>>> hook] INFO
>>>>  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>>>> Shutting down TaskExecutorLocalStateStoresManager.
>>>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>>>>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down BLOB
>>>> cache
>>>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook] INFO
>>>>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>>>> FileChannelManager removed spill file directory
>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
>>>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>>>>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
>>>> directory
>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>>>>
>>>> As the tail log of jobmanager.log is kind of lengthy, I have attached
>>>> it in this mail.
>>>>
>>>> From what I have seen, the TaskManager and JobManager shut down by
>>>> themselves, however, I have noticed some Netty exceptions (from the stack
>>>> trace, it is part of the REST handler) like:
>>>>
>>>> ERROR
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>>>  - Failed to submit a listener notification task. Event loop shut down?
>>>> java.util.concurrent.RejectedExecutionException: event executor
>>>> terminated
>>>>
>>>> Thus I suppose that these exceptions might be the actual cause of
>>>> premature termination of the REST server, and I am still looking into the
>>>> real cause of this.
>>>>
>>>> Best,
>>>> Weike
>>>>
>>>> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com> wrote:
>>>>
>>>>> Would you mind to share more information about why the task executor
>>>>> is killed? If it is killed by Yarn, you might get such info in Yarn
>>>>> NM/RM logs.
>>>>>
>>>>> Best,
>>>>> Yangze Guo
>>>>>
>>>>> Best,
>>>>> Yangze Guo
>>>>>
>>>>>
>>>>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <ky...@connect.hku.hk>
>>>>> wrote:
>>>>> >
>>>>> > Hi,
>>>>> >
>>>>> > Recently I have encountered a strange behavior of Flink on YARN,
>>>>> which is that when I try to cancel a Flink job running in per-job mode on
>>>>> YARN using commands like
>>>>> >
>>>>> > "cancel -m yarn-cluster -yid application_1559388106022_9412
>>>>> ed7e2e0ab0a7316c1b65df6047bc6aae"
>>>>> >
>>>>> > the client happily found and connected to ResourceManager and then
>>>>> stucks at
>>>>> > Found Web Interface 172.28.28.3:50099 of application
>>>>> 'application_1559388106022_9412'.
>>>>> >
>>>>> > And after one minute, an exception is thrown at the client side:
>>>>> > Caused by: org.apache.flink.util.FlinkException: Could not cancel
>>>>> job ed7e2e0ab0a7316c1b65df6047bc6aae.
>>>>> >     at
>>>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>>>>> >     at
>>>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>>>>> >     at
>>>>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>>>>> >     at
>>>>> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>>>>> >     at
>>>>> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>>>>> >     at java.security.AccessController.doPrivileged(Native Method)
>>>>> >     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>> >     at
>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>>>>> >     ... 20 more
>>>>> > Caused by: java.util.concurrent.TimeoutException
>>>>> >     at
>>>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>>>>> >     at
>>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>>>> >     at
>>>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>>>>> >     ... 27 more
>>>>> >
>>>>> > Then I discovered that the YARN app has already terminated with
>>>>> FINISHED state and KILLED final status, like below.
>>>>> >
>>>>> > And after digging into the log of this finished YARN app, I have
>>>>> found that TaskManager had already received the SIGTERM signal and
>>>>> terminated gracefully.
>>>>> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>>>> SIGTERM. Shutting down as requested.
>>>>> >
>>>>> > Also, the log of JobManager shows that it terminated with exit code
>>>>> 0.
>>>>> > Terminating cluster entrypoint process YarnJobClusterEntrypoint with
>>>>> exit code 0
>>>>> >
>>>>> > However, the JobManager did not return anything to the client before
>>>>> its shutdown, which is different from previous versions (like Flink 1.9).
>>>>> >
>>>>> > I wonder if this is a new bug on the flink-clients or flink-yarn
>>>>> module?
>>>>> >
>>>>> > Thank you : )
>>>>> >
>>>>> > Sincerely,
>>>>> > Weike
>>>>>
>>>>

Re: Flink YARN app terminated before the client receives the result

Posted by tison <wa...@gmail.com>.
edit: previously after the cancellation we have a longer call chain to
#jobReachedGloballyTerminalState which does the archive job & JM graceful
showdown, which might take some time so that ...

Best,
tison.


tison <wa...@gmail.com> 于2020年3月17日周二 上午10:13写道:

> Hi Weike & Till,
>
> I agree with Till and it is also the analysis from my side. However, it
> seems even if we don't have FLINK-15116, it is still possible that we
> complete the cancel future but the cluster got shutdown before it properly
> delivered the response.
>
> There is one thing strange that this behavior almost reproducible, it
> should be a possible order but not always. Maybe previous we have to
> firstly cancel the job which has a long call chain so that it happens we
> have enough time to delivered the response.
>
> But the resolution looks like we introduce some
> synchronization/finalization logics that clear these outstanding future
> with best effort before the cluster(RestServer) down.
>
> Best,
> tison.
>
>
> Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 上午4:12写道:
>
>> Hi Weike,
>>
>> could you share the complete logs with us? Attachments are being filtered
>> out by the Apache mail server but it works if you upload the logs somewhere
>> (e.g. https://gist.github.com/) and then share the link with us. Ideally
>> you run the cluster with DEBUG log settings.
>>
>> I assume that you are running Flink 1.10, right?
>>
>> My suspicion is that this behaviour has been introduced with FLINK-15116
>> [1]. It looks as if we complete the shutdown future in
>> MiniDispatcher#cancelJob before we return the response to the
>> RestClusterClient. My guess is that this triggers the shutdown of the
>> RestServer which then is not able to serve the response to the client. I'm
>> pulling in Aljoscha and Tison who introduced this change. They might be
>> able to verify my theory and propose a solution for it.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-15116
>>
>> Cheers,
>> Till
>>
>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <ky...@connect.hku.hk>
>> wrote:
>>
>>> Hi Yangze and all,
>>>
>>> I have tried numerous times, and this behavior persists.
>>>
>>> Below is the tail log of taskmanager.log:
>>>
>>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
>>>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Free slot
>>> TaskSlot(index:0, state:ACTIVE, resource profile:
>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
>>> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
>>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
>>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>>> d0a674795be98bd2574d9ea3286801cb).
>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
>>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot reconnect
>>> to job d0a674795be98bd2574d9ea3286801cb because it is not registered.
>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>> SIGTERM. Shutting down as requested.
>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>> SIGTERM. Shutting down as requested.
>>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>>>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down BLOB
>>> cache
>>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle shutdown
>>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>>> FileChannelManager removed spill file directory
>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
>>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager shutdown
>>> hook] INFO
>>>  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>>> Shutting down TaskExecutorLocalStateStoresManager.
>>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>>>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down BLOB
>>> cache
>>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook] INFO
>>>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>>> FileChannelManager removed spill file directory
>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
>>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>>>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
>>> directory
>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>>>
>>> As the tail log of jobmanager.log is kind of lengthy, I have attached it
>>> in this mail.
>>>
>>> From what I have seen, the TaskManager and JobManager shut down by
>>> themselves, however, I have noticed some Netty exceptions (from the stack
>>> trace, it is part of the REST handler) like:
>>>
>>> ERROR
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>>  - Failed to submit a listener notification task. Event loop shut down?
>>> java.util.concurrent.RejectedExecutionException: event executor
>>> terminated
>>>
>>> Thus I suppose that these exceptions might be the actual cause of
>>> premature termination of the REST server, and I am still looking into the
>>> real cause of this.
>>>
>>> Best,
>>> Weike
>>>
>>> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com> wrote:
>>>
>>>> Would you mind to share more information about why the task executor
>>>> is killed? If it is killed by Yarn, you might get such info in Yarn
>>>> NM/RM logs.
>>>>
>>>> Best,
>>>> Yangze Guo
>>>>
>>>> Best,
>>>> Yangze Guo
>>>>
>>>>
>>>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <ky...@connect.hku.hk>
>>>> wrote:
>>>> >
>>>> > Hi,
>>>> >
>>>> > Recently I have encountered a strange behavior of Flink on YARN,
>>>> which is that when I try to cancel a Flink job running in per-job mode on
>>>> YARN using commands like
>>>> >
>>>> > "cancel -m yarn-cluster -yid application_1559388106022_9412
>>>> ed7e2e0ab0a7316c1b65df6047bc6aae"
>>>> >
>>>> > the client happily found and connected to ResourceManager and then
>>>> stucks at
>>>> > Found Web Interface 172.28.28.3:50099 of application
>>>> 'application_1559388106022_9412'.
>>>> >
>>>> > And after one minute, an exception is thrown at the client side:
>>>> > Caused by: org.apache.flink.util.FlinkException: Could not cancel job
>>>> ed7e2e0ab0a7316c1b65df6047bc6aae.
>>>> >     at
>>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>>>> >     at
>>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>>>> >     at
>>>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>>>> >     at
>>>> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>>>> >     at
>>>> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>>>> >     at java.security.AccessController.doPrivileged(Native Method)
>>>> >     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>> >     at
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>>>> >     ... 20 more
>>>> > Caused by: java.util.concurrent.TimeoutException
>>>> >     at
>>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>>>> >     at
>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>>> >     at
>>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>>>> >     ... 27 more
>>>> >
>>>> > Then I discovered that the YARN app has already terminated with
>>>> FINISHED state and KILLED final status, like below.
>>>> >
>>>> > And after digging into the log of this finished YARN app, I have
>>>> found that TaskManager had already received the SIGTERM signal and
>>>> terminated gracefully.
>>>> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>>> SIGTERM. Shutting down as requested.
>>>> >
>>>> > Also, the log of JobManager shows that it terminated with exit code 0.
>>>> > Terminating cluster entrypoint process YarnJobClusterEntrypoint with
>>>> exit code 0
>>>> >
>>>> > However, the JobManager did not return anything to the client before
>>>> its shutdown, which is different from previous versions (like Flink 1.9).
>>>> >
>>>> > I wonder if this is a new bug on the flink-clients or flink-yarn
>>>> module?
>>>> >
>>>> > Thank you : )
>>>> >
>>>> > Sincerely,
>>>> > Weike
>>>>
>>>

Re: Flink YARN app terminated before the client receives the result

Posted by tison <wa...@gmail.com>.
edit: previously after the cancellation we have a longer call chain to
#jobReachedGloballyTerminalState which does the archive job & JM graceful
showdown, which might take some time so that ...

Best,
tison.


tison <wa...@gmail.com> 于2020年3月17日周二 上午10:13写道:

> Hi Weike & Till,
>
> I agree with Till and it is also the analysis from my side. However, it
> seems even if we don't have FLINK-15116, it is still possible that we
> complete the cancel future but the cluster got shutdown before it properly
> delivered the response.
>
> There is one thing strange that this behavior almost reproducible, it
> should be a possible order but not always. Maybe previous we have to
> firstly cancel the job which has a long call chain so that it happens we
> have enough time to delivered the response.
>
> But the resolution looks like we introduce some
> synchronization/finalization logics that clear these outstanding future
> with best effort before the cluster(RestServer) down.
>
> Best,
> tison.
>
>
> Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 上午4:12写道:
>
>> Hi Weike,
>>
>> could you share the complete logs with us? Attachments are being filtered
>> out by the Apache mail server but it works if you upload the logs somewhere
>> (e.g. https://gist.github.com/) and then share the link with us. Ideally
>> you run the cluster with DEBUG log settings.
>>
>> I assume that you are running Flink 1.10, right?
>>
>> My suspicion is that this behaviour has been introduced with FLINK-15116
>> [1]. It looks as if we complete the shutdown future in
>> MiniDispatcher#cancelJob before we return the response to the
>> RestClusterClient. My guess is that this triggers the shutdown of the
>> RestServer which then is not able to serve the response to the client. I'm
>> pulling in Aljoscha and Tison who introduced this change. They might be
>> able to verify my theory and propose a solution for it.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-15116
>>
>> Cheers,
>> Till
>>
>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <ky...@connect.hku.hk>
>> wrote:
>>
>>> Hi Yangze and all,
>>>
>>> I have tried numerous times, and this behavior persists.
>>>
>>> Below is the tail log of taskmanager.log:
>>>
>>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
>>>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Free slot
>>> TaskSlot(index:0, state:ACTIVE, resource profile:
>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
>>> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
>>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
>>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>>> d0a674795be98bd2574d9ea3286801cb).
>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
>>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot reconnect
>>> to job d0a674795be98bd2574d9ea3286801cb because it is not registered.
>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>> SIGTERM. Shutting down as requested.
>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>> SIGTERM. Shutting down as requested.
>>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>>>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down BLOB
>>> cache
>>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle shutdown
>>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>>> FileChannelManager removed spill file directory
>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
>>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager shutdown
>>> hook] INFO
>>>  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>>> Shutting down TaskExecutorLocalStateStoresManager.
>>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>>>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down BLOB
>>> cache
>>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook] INFO
>>>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>>> FileChannelManager removed spill file directory
>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
>>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>>>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
>>> directory
>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>>>
>>> As the tail log of jobmanager.log is kind of lengthy, I have attached it
>>> in this mail.
>>>
>>> From what I have seen, the TaskManager and JobManager shut down by
>>> themselves, however, I have noticed some Netty exceptions (from the stack
>>> trace, it is part of the REST handler) like:
>>>
>>> ERROR
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>>  - Failed to submit a listener notification task. Event loop shut down?
>>> java.util.concurrent.RejectedExecutionException: event executor
>>> terminated
>>>
>>> Thus I suppose that these exceptions might be the actual cause of
>>> premature termination of the REST server, and I am still looking into the
>>> real cause of this.
>>>
>>> Best,
>>> Weike
>>>
>>> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com> wrote:
>>>
>>>> Would you mind to share more information about why the task executor
>>>> is killed? If it is killed by Yarn, you might get such info in Yarn
>>>> NM/RM logs.
>>>>
>>>> Best,
>>>> Yangze Guo
>>>>
>>>> Best,
>>>> Yangze Guo
>>>>
>>>>
>>>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <ky...@connect.hku.hk>
>>>> wrote:
>>>> >
>>>> > Hi,
>>>> >
>>>> > Recently I have encountered a strange behavior of Flink on YARN,
>>>> which is that when I try to cancel a Flink job running in per-job mode on
>>>> YARN using commands like
>>>> >
>>>> > "cancel -m yarn-cluster -yid application_1559388106022_9412
>>>> ed7e2e0ab0a7316c1b65df6047bc6aae"
>>>> >
>>>> > the client happily found and connected to ResourceManager and then
>>>> stucks at
>>>> > Found Web Interface 172.28.28.3:50099 of application
>>>> 'application_1559388106022_9412'.
>>>> >
>>>> > And after one minute, an exception is thrown at the client side:
>>>> > Caused by: org.apache.flink.util.FlinkException: Could not cancel job
>>>> ed7e2e0ab0a7316c1b65df6047bc6aae.
>>>> >     at
>>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>>>> >     at
>>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>>>> >     at
>>>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>>>> >     at
>>>> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>>>> >     at
>>>> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>>>> >     at java.security.AccessController.doPrivileged(Native Method)
>>>> >     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>> >     at
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>>>> >     ... 20 more
>>>> > Caused by: java.util.concurrent.TimeoutException
>>>> >     at
>>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>>>> >     at
>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>>> >     at
>>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>>>> >     ... 27 more
>>>> >
>>>> > Then I discovered that the YARN app has already terminated with
>>>> FINISHED state and KILLED final status, like below.
>>>> >
>>>> > And after digging into the log of this finished YARN app, I have
>>>> found that TaskManager had already received the SIGTERM signal and
>>>> terminated gracefully.
>>>> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>>> SIGTERM. Shutting down as requested.
>>>> >
>>>> > Also, the log of JobManager shows that it terminated with exit code 0.
>>>> > Terminating cluster entrypoint process YarnJobClusterEntrypoint with
>>>> exit code 0
>>>> >
>>>> > However, the JobManager did not return anything to the client before
>>>> its shutdown, which is different from previous versions (like Flink 1.9).
>>>> >
>>>> > I wonder if this is a new bug on the flink-clients or flink-yarn
>>>> module?
>>>> >
>>>> > Thank you : )
>>>> >
>>>> > Sincerely,
>>>> > Weike
>>>>
>>>

Re: Flink YARN app terminated before the client receives the result

Posted by tison <wa...@gmail.com>.
Hi Weike & Till,

I agree with Till and it is also the analysis from my side. However, it
seems even if we don't have FLINK-15116, it is still possible that we
complete the cancel future but the cluster got shutdown before it properly
delivered the response.

There is one thing strange that this behavior almost reproducible, it
should be a possible order but not always. Maybe previous we have to
firstly cancel the job which has a long call chain so that it happens we
have enough time to delivered the response.

But the resolution looks like we introduce some
synchronization/finalization logics that clear these outstanding future
with best effort before the cluster(RestServer) down.

Best,
tison.


Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 上午4:12写道:

> Hi Weike,
>
> could you share the complete logs with us? Attachments are being filtered
> out by the Apache mail server but it works if you upload the logs somewhere
> (e.g. https://gist.github.com/) and then share the link with us. Ideally
> you run the cluster with DEBUG log settings.
>
> I assume that you are running Flink 1.10, right?
>
> My suspicion is that this behaviour has been introduced with FLINK-15116
> [1]. It looks as if we complete the shutdown future in
> MiniDispatcher#cancelJob before we return the response to the
> RestClusterClient. My guess is that this triggers the shutdown of the
> RestServer which then is not able to serve the response to the client. I'm
> pulling in Aljoscha and Tison who introduced this change. They might be
> able to verify my theory and propose a solution for it.
>
> [1] https://issues.apache.org/jira/browse/FLINK-15116
>
> Cheers,
> Till
>
> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <ky...@connect.hku.hk>
> wrote:
>
>> Hi Yangze and all,
>>
>> I have tried numerous times, and this behavior persists.
>>
>> Below is the tail log of taskmanager.log:
>>
>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Free slot
>> TaskSlot(index:0, state:ACTIVE, resource profile:
>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
>> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>> d0a674795be98bd2574d9ea3286801cb).
>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
>> connection for job d0a674795be98bd2574d9ea3286801cb.
>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
>> connection for job d0a674795be98bd2574d9ea3286801cb.
>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot reconnect
>> to job d0a674795be98bd2574d9ea3286801cb because it is not registered.
>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>> SIGTERM. Shutting down as requested.
>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>> SIGTERM. Shutting down as requested.
>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down BLOB
>> cache
>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle shutdown
>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>> FileChannelManager removed spill file directory
>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager shutdown
>> hook] INFO
>>  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>> Shutting down TaskExecutorLocalStateStoresManager.
>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down BLOB
>> cache
>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook] INFO
>>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>> FileChannelManager removed spill file directory
>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
>> directory
>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>>
>> As the tail log of jobmanager.log is kind of lengthy, I have attached it
>> in this mail.
>>
>> From what I have seen, the TaskManager and JobManager shut down by
>> themselves, however, I have noticed some Netty exceptions (from the stack
>> trace, it is part of the REST handler) like:
>>
>> ERROR
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>  - Failed to submit a listener notification task. Event loop shut down?
>> java.util.concurrent.RejectedExecutionException: event executor terminated
>>
>> Thus I suppose that these exceptions might be the actual cause of
>> premature termination of the REST server, and I am still looking into the
>> real cause of this.
>>
>> Best,
>> Weike
>>
>> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com> wrote:
>>
>>> Would you mind to share more information about why the task executor
>>> is killed? If it is killed by Yarn, you might get such info in Yarn
>>> NM/RM logs.
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> Best,
>>> Yangze Guo
>>>
>>>
>>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <ky...@connect.hku.hk>
>>> wrote:
>>> >
>>> > Hi,
>>> >
>>> > Recently I have encountered a strange behavior of Flink on YARN, which
>>> is that when I try to cancel a Flink job running in per-job mode on YARN
>>> using commands like
>>> >
>>> > "cancel -m yarn-cluster -yid application_1559388106022_9412
>>> ed7e2e0ab0a7316c1b65df6047bc6aae"
>>> >
>>> > the client happily found and connected to ResourceManager and then
>>> stucks at
>>> > Found Web Interface 172.28.28.3:50099 of application
>>> 'application_1559388106022_9412'.
>>> >
>>> > And after one minute, an exception is thrown at the client side:
>>> > Caused by: org.apache.flink.util.FlinkException: Could not cancel job
>>> ed7e2e0ab0a7316c1b65df6047bc6aae.
>>> >     at
>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>>> >     at
>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>>> >     at
>>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>>> >     at
>>> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>>> >     at
>>> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>>> >     at java.security.AccessController.doPrivileged(Native Method)
>>> >     at javax.security.auth.Subject.doAs(Subject.java:422)
>>> >     at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>>> >     ... 20 more
>>> > Caused by: java.util.concurrent.TimeoutException
>>> >     at
>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>>> >     at
>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>> >     at
>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>>> >     ... 27 more
>>> >
>>> > Then I discovered that the YARN app has already terminated with
>>> FINISHED state and KILLED final status, like below.
>>> >
>>> > And after digging into the log of this finished YARN app, I have found
>>> that TaskManager had already received the SIGTERM signal and terminated
>>> gracefully.
>>> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>> SIGTERM. Shutting down as requested.
>>> >
>>> > Also, the log of JobManager shows that it terminated with exit code 0.
>>> > Terminating cluster entrypoint process YarnJobClusterEntrypoint with
>>> exit code 0
>>> >
>>> > However, the JobManager did not return anything to the client before
>>> its shutdown, which is different from previous versions (like Flink 1.9).
>>> >
>>> > I wonder if this is a new bug on the flink-clients or flink-yarn
>>> module?
>>> >
>>> > Thank you : )
>>> >
>>> > Sincerely,
>>> > Weike
>>>
>>

Re: Flink YARN app terminated before the client receives the result

Posted by tison <wa...@gmail.com>.
Hi Weike & Till,

I agree with Till and it is also the analysis from my side. However, it
seems even if we don't have FLINK-15116, it is still possible that we
complete the cancel future but the cluster got shutdown before it properly
delivered the response.

There is one thing strange that this behavior almost reproducible, it
should be a possible order but not always. Maybe previous we have to
firstly cancel the job which has a long call chain so that it happens we
have enough time to delivered the response.

But the resolution looks like we introduce some
synchronization/finalization logics that clear these outstanding future
with best effort before the cluster(RestServer) down.

Best,
tison.


Till Rohrmann <tr...@apache.org> 于2020年3月17日周二 上午4:12写道:

> Hi Weike,
>
> could you share the complete logs with us? Attachments are being filtered
> out by the Apache mail server but it works if you upload the logs somewhere
> (e.g. https://gist.github.com/) and then share the link with us. Ideally
> you run the cluster with DEBUG log settings.
>
> I assume that you are running Flink 1.10, right?
>
> My suspicion is that this behaviour has been introduced with FLINK-15116
> [1]. It looks as if we complete the shutdown future in
> MiniDispatcher#cancelJob before we return the response to the
> RestClusterClient. My guess is that this triggers the shutdown of the
> RestServer which then is not able to serve the response to the client. I'm
> pulling in Aljoscha and Tison who introduced this change. They might be
> able to verify my theory and propose a solution for it.
>
> [1] https://issues.apache.org/jira/browse/FLINK-15116
>
> Cheers,
> Till
>
> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <ky...@connect.hku.hk>
> wrote:
>
>> Hi Yangze and all,
>>
>> I have tried numerous times, and this behavior persists.
>>
>> Below is the tail log of taskmanager.log:
>>
>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Free slot
>> TaskSlot(index:0, state:ACTIVE, resource profile:
>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
>> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
>> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>> d0a674795be98bd2574d9ea3286801cb).
>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
>> connection for job d0a674795be98bd2574d9ea3286801cb.
>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
>> connection for job d0a674795be98bd2574d9ea3286801cb.
>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot reconnect
>> to job d0a674795be98bd2574d9ea3286801cb because it is not registered.
>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>> SIGTERM. Shutting down as requested.
>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>> SIGTERM. Shutting down as requested.
>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down BLOB
>> cache
>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle shutdown
>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>> FileChannelManager removed spill file directory
>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager shutdown
>> hook] INFO
>>  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>> Shutting down TaskExecutorLocalStateStoresManager.
>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down BLOB
>> cache
>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook] INFO
>>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
>> FileChannelManager removed spill file directory
>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
>> directory
>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>>
>> As the tail log of jobmanager.log is kind of lengthy, I have attached it
>> in this mail.
>>
>> From what I have seen, the TaskManager and JobManager shut down by
>> themselves, however, I have noticed some Netty exceptions (from the stack
>> trace, it is part of the REST handler) like:
>>
>> ERROR
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>  - Failed to submit a listener notification task. Event loop shut down?
>> java.util.concurrent.RejectedExecutionException: event executor terminated
>>
>> Thus I suppose that these exceptions might be the actual cause of
>> premature termination of the REST server, and I am still looking into the
>> real cause of this.
>>
>> Best,
>> Weike
>>
>> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com> wrote:
>>
>>> Would you mind to share more information about why the task executor
>>> is killed? If it is killed by Yarn, you might get such info in Yarn
>>> NM/RM logs.
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> Best,
>>> Yangze Guo
>>>
>>>
>>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <ky...@connect.hku.hk>
>>> wrote:
>>> >
>>> > Hi,
>>> >
>>> > Recently I have encountered a strange behavior of Flink on YARN, which
>>> is that when I try to cancel a Flink job running in per-job mode on YARN
>>> using commands like
>>> >
>>> > "cancel -m yarn-cluster -yid application_1559388106022_9412
>>> ed7e2e0ab0a7316c1b65df6047bc6aae"
>>> >
>>> > the client happily found and connected to ResourceManager and then
>>> stucks at
>>> > Found Web Interface 172.28.28.3:50099 of application
>>> 'application_1559388106022_9412'.
>>> >
>>> > And after one minute, an exception is thrown at the client side:
>>> > Caused by: org.apache.flink.util.FlinkException: Could not cancel job
>>> ed7e2e0ab0a7316c1b65df6047bc6aae.
>>> >     at
>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>>> >     at
>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>>> >     at
>>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>>> >     at
>>> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>>> >     at
>>> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>>> >     at java.security.AccessController.doPrivileged(Native Method)
>>> >     at javax.security.auth.Subject.doAs(Subject.java:422)
>>> >     at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>>> >     ... 20 more
>>> > Caused by: java.util.concurrent.TimeoutException
>>> >     at
>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>>> >     at
>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>> >     at
>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>>> >     ... 27 more
>>> >
>>> > Then I discovered that the YARN app has already terminated with
>>> FINISHED state and KILLED final status, like below.
>>> >
>>> > And after digging into the log of this finished YARN app, I have found
>>> that TaskManager had already received the SIGTERM signal and terminated
>>> gracefully.
>>> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>>> SIGTERM. Shutting down as requested.
>>> >
>>> > Also, the log of JobManager shows that it terminated with exit code 0.
>>> > Terminating cluster entrypoint process YarnJobClusterEntrypoint with
>>> exit code 0
>>> >
>>> > However, the JobManager did not return anything to the client before
>>> its shutdown, which is different from previous versions (like Flink 1.9).
>>> >
>>> > I wonder if this is a new bug on the flink-clients or flink-yarn
>>> module?
>>> >
>>> > Thank you : )
>>> >
>>> > Sincerely,
>>> > Weike
>>>
>>

Re: Flink YARN app terminated before the client receives the result

Posted by Till Rohrmann <tr...@apache.org>.
Hi Weike,

could you share the complete logs with us? Attachments are being filtered
out by the Apache mail server but it works if you upload the logs somewhere
(e.g. https://gist.github.com/) and then share the link with us. Ideally
you run the cluster with DEBUG log settings.

I assume that you are running Flink 1.10, right?

My suspicion is that this behaviour has been introduced with FLINK-15116
[1]. It looks as if we complete the shutdown future in
MiniDispatcher#cancelJob before we return the response to the
RestClusterClient. My guess is that this triggers the shutdown of the
RestServer which then is not able to serve the response to the client. I'm
pulling in Aljoscha and Tison who introduced this change. They might be
able to verify my theory and propose a solution for it.

[1] https://issues.apache.org/jira/browse/FLINK-15116

Cheers,
Till

On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <ky...@connect.hku.hk> wrote:

> Hi Yangze and all,
>
> I have tried numerous times, and this behavior persists.
>
> Below is the tail log of taskmanager.log:
>
> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Free slot
> TaskSlot(index:0, state:ACTIVE, resource profile:
> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
> d0a674795be98bd2574d9ea3286801cb).
> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
> connection for job d0a674795be98bd2574d9ea3286801cb.
> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
> connection for job d0a674795be98bd2574d9ea3286801cb.
> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot reconnect
> to job d0a674795be98bd2574d9ea3286801cb because it is not registered.
> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
> SIGTERM. Shutting down as requested.
> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
> SIGTERM. Shutting down as requested.
> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down BLOB
> cache
> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle shutdown
> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
> FileChannelManager removed spill file directory
> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager shutdown
> hook] INFO
>  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
> Shutting down TaskExecutorLocalStateStoresManager.
> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down BLOB
> cache
> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook] INFO
>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
> FileChannelManager removed spill file directory
> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
> directory
> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>
> As the tail log of jobmanager.log is kind of lengthy, I have attached it
> in this mail.
>
> From what I have seen, the TaskManager and JobManager shut down by
> themselves, however, I have noticed some Netty exceptions (from the stack
> trace, it is part of the REST handler) like:
>
> ERROR
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>  - Failed to submit a listener notification task. Event loop shut down?
> java.util.concurrent.RejectedExecutionException: event executor terminated
>
> Thus I suppose that these exceptions might be the actual cause of
> premature termination of the REST server, and I am still looking into the
> real cause of this.
>
> Best,
> Weike
>
> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com> wrote:
>
>> Would you mind to share more information about why the task executor
>> is killed? If it is killed by Yarn, you might get such info in Yarn
>> NM/RM logs.
>>
>> Best,
>> Yangze Guo
>>
>> Best,
>> Yangze Guo
>>
>>
>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <ky...@connect.hku.hk>
>> wrote:
>> >
>> > Hi,
>> >
>> > Recently I have encountered a strange behavior of Flink on YARN, which
>> is that when I try to cancel a Flink job running in per-job mode on YARN
>> using commands like
>> >
>> > "cancel -m yarn-cluster -yid application_1559388106022_9412
>> ed7e2e0ab0a7316c1b65df6047bc6aae"
>> >
>> > the client happily found and connected to ResourceManager and then
>> stucks at
>> > Found Web Interface 172.28.28.3:50099 of application
>> 'application_1559388106022_9412'.
>> >
>> > And after one minute, an exception is thrown at the client side:
>> > Caused by: org.apache.flink.util.FlinkException: Could not cancel job
>> ed7e2e0ab0a7316c1b65df6047bc6aae.
>> >     at
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>> >     at
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>> >     at
>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>> >     at
>> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>> >     at
>> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>> >     at java.security.AccessController.doPrivileged(Native Method)
>> >     at javax.security.auth.Subject.doAs(Subject.java:422)
>> >     at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>> >     ... 20 more
>> > Caused by: java.util.concurrent.TimeoutException
>> >     at
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> >     at
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> >     at
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>> >     ... 27 more
>> >
>> > Then I discovered that the YARN app has already terminated with
>> FINISHED state and KILLED final status, like below.
>> >
>> > And after digging into the log of this finished YARN app, I have found
>> that TaskManager had already received the SIGTERM signal and terminated
>> gracefully.
>> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>> SIGTERM. Shutting down as requested.
>> >
>> > Also, the log of JobManager shows that it terminated with exit code 0.
>> > Terminating cluster entrypoint process YarnJobClusterEntrypoint with
>> exit code 0
>> >
>> > However, the JobManager did not return anything to the client before
>> its shutdown, which is different from previous versions (like Flink 1.9).
>> >
>> > I wonder if this is a new bug on the flink-clients or flink-yarn module?
>> >
>> > Thank you : )
>> >
>> > Sincerely,
>> > Weike
>>
>

Re: Flink YARN app terminated before the client receives the result

Posted by Till Rohrmann <tr...@apache.org>.
Hi Weike,

could you share the complete logs with us? Attachments are being filtered
out by the Apache mail server but it works if you upload the logs somewhere
(e.g. https://gist.github.com/) and then share the link with us. Ideally
you run the cluster with DEBUG log settings.

I assume that you are running Flink 1.10, right?

My suspicion is that this behaviour has been introduced with FLINK-15116
[1]. It looks as if we complete the shutdown future in
MiniDispatcher#cancelJob before we return the response to the
RestClusterClient. My guess is that this triggers the shutdown of the
RestServer which then is not able to serve the response to the client. I'm
pulling in Aljoscha and Tison who introduced this change. They might be
able to verify my theory and propose a solution for it.

[1] https://issues.apache.org/jira/browse/FLINK-15116

Cheers,
Till

On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <ky...@connect.hku.hk> wrote:

> Hi Yangze and all,
>
> I have tried numerous times, and this behavior persists.
>
> Below is the tail log of taskmanager.log:
>
> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Free slot
> TaskSlot(index:0, state:ACTIVE, resource profile:
> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
> (1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
> (1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
> d0a674795be98bd2574d9ea3286801cb).
> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
> connection for job d0a674795be98bd2574d9ea3286801cb.
> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
> connection for job d0a674795be98bd2574d9ea3286801cb.
> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
>  org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot reconnect
> to job d0a674795be98bd2574d9ea3286801cb because it is not registered.
> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
> SIGTERM. Shutting down as requested.
> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>  org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
> SIGTERM. Shutting down as requested.
> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
>  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down BLOB
> cache
> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle shutdown
> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
> FileChannelManager removed spill file directory
> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager shutdown
> hook] INFO
>  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
> Shutting down TaskExecutorLocalStateStoresManager.
> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
>  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down BLOB
> cache
> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook] INFO
>  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
> FileChannelManager removed spill file directory
> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>  org.apache.flink.runtime.filecache.FileCache  - removed file cache
> directory
> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>
> As the tail log of jobmanager.log is kind of lengthy, I have attached it
> in this mail.
>
> From what I have seen, the TaskManager and JobManager shut down by
> themselves, however, I have noticed some Netty exceptions (from the stack
> trace, it is part of the REST handler) like:
>
> ERROR
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>  - Failed to submit a listener notification task. Event loop shut down?
> java.util.concurrent.RejectedExecutionException: event executor terminated
>
> Thus I suppose that these exceptions might be the actual cause of
> premature termination of the REST server, and I am still looking into the
> real cause of this.
>
> Best,
> Weike
>
> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com> wrote:
>
>> Would you mind to share more information about why the task executor
>> is killed? If it is killed by Yarn, you might get such info in Yarn
>> NM/RM logs.
>>
>> Best,
>> Yangze Guo
>>
>> Best,
>> Yangze Guo
>>
>>
>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <ky...@connect.hku.hk>
>> wrote:
>> >
>> > Hi,
>> >
>> > Recently I have encountered a strange behavior of Flink on YARN, which
>> is that when I try to cancel a Flink job running in per-job mode on YARN
>> using commands like
>> >
>> > "cancel -m yarn-cluster -yid application_1559388106022_9412
>> ed7e2e0ab0a7316c1b65df6047bc6aae"
>> >
>> > the client happily found and connected to ResourceManager and then
>> stucks at
>> > Found Web Interface 172.28.28.3:50099 of application
>> 'application_1559388106022_9412'.
>> >
>> > And after one minute, an exception is thrown at the client side:
>> > Caused by: org.apache.flink.util.FlinkException: Could not cancel job
>> ed7e2e0ab0a7316c1b65df6047bc6aae.
>> >     at
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>> >     at
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>> >     at
>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>> >     at
>> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>> >     at
>> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>> >     at java.security.AccessController.doPrivileged(Native Method)
>> >     at javax.security.auth.Subject.doAs(Subject.java:422)
>> >     at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>> >     ... 20 more
>> > Caused by: java.util.concurrent.TimeoutException
>> >     at
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> >     at
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> >     at
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>> >     ... 27 more
>> >
>> > Then I discovered that the YARN app has already terminated with
>> FINISHED state and KILLED final status, like below.
>> >
>> > And after digging into the log of this finished YARN app, I have found
>> that TaskManager had already received the SIGTERM signal and terminated
>> gracefully.
>> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
>> SIGTERM. Shutting down as requested.
>> >
>> > Also, the log of JobManager shows that it terminated with exit code 0.
>> > Terminating cluster entrypoint process YarnJobClusterEntrypoint with
>> exit code 0
>> >
>> > However, the JobManager did not return anything to the client before
>> its shutdown, which is different from previous versions (like Flink 1.9).
>> >
>> > I wonder if this is a new bug on the flink-clients or flink-yarn module?
>> >
>> > Thank you : )
>> >
>> > Sincerely,
>> > Weike
>>
>

Re: Flink YARN app terminated before the client receives the result

Posted by "DONG, Weike" <ky...@connect.hku.hk>.
Hi Yangze and all,

I have tried numerous times, and this behavior persists.

Below is the tail log of taskmanager.log:

2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Free slot
TaskSlot(index:0, state:ACTIVE, resource profile:
ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
(1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
(1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
d0a674795be98bd2574d9ea3286801cb).
2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
connection for job d0a674795be98bd2574d9ea3286801cb.
2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
connection for job d0a674795be98bd2574d9ea3286801cb.
2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot reconnect
to job d0a674795be98bd2574d9ea3286801cb because it is not registered.
2020-03-13 12:06:19.744 [SIGTERM handler] INFO
 org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
SIGTERM. Shutting down as requested.
2020-03-13 12:06:19.744 [SIGTERM handler] INFO
 org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
SIGTERM. Shutting down as requested.
2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
 org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down BLOB
cache
2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle shutdown
hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
FileChannelManager removed spill file directory
/data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager shutdown hook]
INFO  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
Shutting down TaskExecutorLocalStateStoresManager.
2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
 org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down BLOB
cache
2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook] INFO
 org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
FileChannelManager removed spill file directory
/data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
 org.apache.flink.runtime.filecache.FileCache  - removed file cache
directory
/data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31

As the tail log of jobmanager.log is kind of lengthy, I have attached it in
this mail.

From what I have seen, the TaskManager and JobManager shut down by
themselves, however, I have noticed some Netty exceptions (from the stack
trace, it is part of the REST handler) like:

ERROR
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
 - Failed to submit a listener notification task. Event loop shut down?
java.util.concurrent.RejectedExecutionException: event executor terminated

Thus I suppose that these exceptions might be the actual cause of premature
termination of the REST server, and I am still looking into the real cause
of this.

Best,
Weike

On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com> wrote:

> Would you mind to share more information about why the task executor
> is killed? If it is killed by Yarn, you might get such info in Yarn
> NM/RM logs.
>
> Best,
> Yangze Guo
>
> Best,
> Yangze Guo
>
>
> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <ky...@connect.hku.hk>
> wrote:
> >
> > Hi,
> >
> > Recently I have encountered a strange behavior of Flink on YARN, which
> is that when I try to cancel a Flink job running in per-job mode on YARN
> using commands like
> >
> > "cancel -m yarn-cluster -yid application_1559388106022_9412
> ed7e2e0ab0a7316c1b65df6047bc6aae"
> >
> > the client happily found and connected to ResourceManager and then
> stucks at
> > Found Web Interface 172.28.28.3:50099 of application
> 'application_1559388106022_9412'.
> >
> > And after one minute, an exception is thrown at the client side:
> > Caused by: org.apache.flink.util.FlinkException: Could not cancel job
> ed7e2e0ab0a7316c1b65df6047bc6aae.
> >     at
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
> >     at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
> >     at
> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
> >     at
> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
> >     at
> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
> >     at java.security.AccessController.doPrivileged(Native Method)
> >     at javax.security.auth.Subject.doAs(Subject.java:422)
> >     at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> >     ... 20 more
> > Caused by: java.util.concurrent.TimeoutException
> >     at
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> >     at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> >     at
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
> >     ... 27 more
> >
> > Then I discovered that the YARN app has already terminated with FINISHED
> state and KILLED final status, like below.
> >
> > And after digging into the log of this finished YARN app, I have found
> that TaskManager had already received the SIGTERM signal and terminated
> gracefully.
> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
> SIGTERM. Shutting down as requested.
> >
> > Also, the log of JobManager shows that it terminated with exit code 0.
> > Terminating cluster entrypoint process YarnJobClusterEntrypoint with
> exit code 0
> >
> > However, the JobManager did not return anything to the client before its
> shutdown, which is different from previous versions (like Flink 1.9).
> >
> > I wonder if this is a new bug on the flink-clients or flink-yarn module?
> >
> > Thank you : )
> >
> > Sincerely,
> > Weike
>

Re: Flink YARN app terminated before the client receives the result

Posted by "DONG, Weike" <ky...@connect.hku.hk>.
Hi Yangze and all,

I have tried numerous times, and this behavior persists.

Below is the tail log of taskmanager.log:

2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Free slot
TaskSlot(index:0, state:ACTIVE, resource profile:
ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
(1613968148 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.403gb
(1505922928 bytes), networkMemory=359.040mb (376480732 bytes)},
allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
d0a674795be98bd2574d9ea3286801cb).
2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.taskexecutor.JobLeaderService  - Remove job
d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
connection for job d0a674795be98bd2574d9ea3286801cb.
2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor  - Close JobManager
connection for job d0a674795be98bd2574d9ea3286801cb.
2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.taskexecutor.JobLeaderService  - Cannot reconnect
to job d0a674795be98bd2574d9ea3286801cb because it is not registered.
2020-03-13 12:06:19.744 [SIGTERM handler] INFO
 org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
SIGTERM. Shutting down as requested.
2020-03-13 12:06:19.744 [SIGTERM handler] INFO
 org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
SIGTERM. Shutting down as requested.
2020-03-13 12:06:19.745 [PermanentBlobCache shutdown hook] INFO
 org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down BLOB
cache
2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle shutdown
hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
FileChannelManager removed spill file directory
/data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager shutdown hook]
INFO  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
Shutting down TaskExecutorLocalStateStoresManager.
2020-03-13 12:06:19.750 [TransientBlobCache shutdown hook] INFO
 org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down BLOB
cache
2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown hook] INFO
 org.apache.flink.runtime.io.disk.FileChannelManagerImpl  -
FileChannelManager removed spill file directory
/data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
 org.apache.flink.runtime.filecache.FileCache  - removed file cache
directory
/data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31

As the tail log of jobmanager.log is kind of lengthy, I have attached it in
this mail.

From what I have seen, the TaskManager and JobManager shut down by
themselves, however, I have noticed some Netty exceptions (from the stack
trace, it is part of the REST handler) like:

ERROR
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
 - Failed to submit a listener notification task. Event loop shut down?
java.util.concurrent.RejectedExecutionException: event executor terminated

Thus I suppose that these exceptions might be the actual cause of premature
termination of the REST server, and I am still looking into the real cause
of this.

Best,
Weike

On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <ka...@gmail.com> wrote:

> Would you mind to share more information about why the task executor
> is killed? If it is killed by Yarn, you might get such info in Yarn
> NM/RM logs.
>
> Best,
> Yangze Guo
>
> Best,
> Yangze Guo
>
>
> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <ky...@connect.hku.hk>
> wrote:
> >
> > Hi,
> >
> > Recently I have encountered a strange behavior of Flink on YARN, which
> is that when I try to cancel a Flink job running in per-job mode on YARN
> using commands like
> >
> > "cancel -m yarn-cluster -yid application_1559388106022_9412
> ed7e2e0ab0a7316c1b65df6047bc6aae"
> >
> > the client happily found and connected to ResourceManager and then
> stucks at
> > Found Web Interface 172.28.28.3:50099 of application
> 'application_1559388106022_9412'.
> >
> > And after one minute, an exception is thrown at the client side:
> > Caused by: org.apache.flink.util.FlinkException: Could not cancel job
> ed7e2e0ab0a7316c1b65df6047bc6aae.
> >     at
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
> >     at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
> >     at
> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
> >     at
> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
> >     at
> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
> >     at java.security.AccessController.doPrivileged(Native Method)
> >     at javax.security.auth.Subject.doAs(Subject.java:422)
> >     at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> >     ... 20 more
> > Caused by: java.util.concurrent.TimeoutException
> >     at
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> >     at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> >     at
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
> >     ... 27 more
> >
> > Then I discovered that the YARN app has already terminated with FINISHED
> state and KILLED final status, like below.
> >
> > And after digging into the log of this finished YARN app, I have found
> that TaskManager had already received the SIGTERM signal and terminated
> gracefully.
> > org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15:
> SIGTERM. Shutting down as requested.
> >
> > Also, the log of JobManager shows that it terminated with exit code 0.
> > Terminating cluster entrypoint process YarnJobClusterEntrypoint with
> exit code 0
> >
> > However, the JobManager did not return anything to the client before its
> shutdown, which is different from previous versions (like Flink 1.9).
> >
> > I wonder if this is a new bug on the flink-clients or flink-yarn module?
> >
> > Thank you : )
> >
> > Sincerely,
> > Weike
>

Re: Flink YARN app terminated before the client receives the result

Posted by Yangze Guo <ka...@gmail.com>.
Would you mind to share more information about why the task executor
is killed? If it is killed by Yarn, you might get such info in Yarn
NM/RM logs.

Best,
Yangze Guo

Best,
Yangze Guo


On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <ky...@connect.hku.hk> wrote:
>
> Hi,
>
> Recently I have encountered a strange behavior of Flink on YARN, which is that when I try to cancel a Flink job running in per-job mode on YARN using commands like
>
> "cancel -m yarn-cluster -yid application_1559388106022_9412 ed7e2e0ab0a7316c1b65df6047bc6aae"
>
> the client happily found and connected to ResourceManager and then stucks at
> Found Web Interface 172.28.28.3:50099 of application 'application_1559388106022_9412'.
>
> And after one minute, an exception is thrown at the client side:
> Caused by: org.apache.flink.util.FlinkException: Could not cancel job ed7e2e0ab0a7316c1b65df6047bc6aae.
>     at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>     at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>     at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>     at org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>     at org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>     ... 20 more
> Caused by: java.util.concurrent.TimeoutException
>     at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>     at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>     ... 27 more
>
> Then I discovered that the YARN app has already terminated with FINISHED state and KILLED final status, like below.
>
> And after digging into the log of this finished YARN app, I have found that TaskManager had already received the SIGTERM signal and terminated gracefully.
> org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>
> Also, the log of JobManager shows that it terminated with exit code 0.
> Terminating cluster entrypoint process YarnJobClusterEntrypoint with exit code 0
>
> However, the JobManager did not return anything to the client before its shutdown, which is different from previous versions (like Flink 1.9).
>
> I wonder if this is a new bug on the flink-clients or flink-yarn module?
>
> Thank you : )
>
> Sincerely,
> Weike

Re: Flink YARN app terminated before the client receives the result

Posted by Yangze Guo <ka...@gmail.com>.
Would you mind to share more information about why the task executor
is killed? If it is killed by Yarn, you might get such info in Yarn
NM/RM logs.

Best,
Yangze Guo

Best,
Yangze Guo


On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <ky...@connect.hku.hk> wrote:
>
> Hi,
>
> Recently I have encountered a strange behavior of Flink on YARN, which is that when I try to cancel a Flink job running in per-job mode on YARN using commands like
>
> "cancel -m yarn-cluster -yid application_1559388106022_9412 ed7e2e0ab0a7316c1b65df6047bc6aae"
>
> the client happily found and connected to ResourceManager and then stucks at
> Found Web Interface 172.28.28.3:50099 of application 'application_1559388106022_9412'.
>
> And after one minute, an exception is thrown at the client side:
> Caused by: org.apache.flink.util.FlinkException: Could not cancel job ed7e2e0ab0a7316c1b65df6047bc6aae.
>     at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>     at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>     at org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>     at org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>     at org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>     ... 20 more
> Caused by: java.util.concurrent.TimeoutException
>     at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>     at org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>     ... 27 more
>
> Then I discovered that the YARN app has already terminated with FINISHED state and KILLED final status, like below.
>
> And after digging into the log of this finished YARN app, I have found that TaskManager had already received the SIGTERM signal and terminated gracefully.
> org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>
> Also, the log of JobManager shows that it terminated with exit code 0.
> Terminating cluster entrypoint process YarnJobClusterEntrypoint with exit code 0
>
> However, the JobManager did not return anything to the client before its shutdown, which is different from previous versions (like Flink 1.9).
>
> I wonder if this is a new bug on the flink-clients or flink-yarn module?
>
> Thank you : )
>
> Sincerely,
> Weike