You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "LINZ, Arnaud" <AL...@bouyguestelecom.fr> on 2020/11/16 12:55:28 UTC

Random Task executor shutdown

(reposted with proper subject line -- sorry for the copy/paste)
-----Original message-----
Hello,

I'm running Flink 1.10 on a yarn cluster. I have a streaming application, that, when under heavy load, fails from time to time with this unique error message in the whole yarn log:

(...)
2020-11-15 16:18:42,202 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 63 from task 4cbc940112a596db54568b24f9209aac of job 1e1717d19bd8ea296314077e42e1c7e5 at container_e38_1604477334666_0960_01_000004 @ xxx (dataPort=33099).
2020-11-15 16:18:55,043 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e38_1604477334666_0960_01_000004 because: The TaskExecutor is shutting down.
2020-11-15 16:18:55,087 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (7/15) (c8e92cacddcd4e41f51a2433d07d2153) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.

      at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)
        at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-11-15 16:18:55,092 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - Calculating tasks to restart to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_6.
2020-11-15 16:18:55,101 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - 230 tasks should be restarted to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_6.
(...)

What could be the cause of this failure? Why is there no other error message?

I've tried to increase the value of heartbeat.timeout, thinking that maybe it was due to a slow responding mapper, but it did not solve the issue.

Best regards,
Arnaud

________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

RE: Random Task executor shutdown

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hi,
It’s 3.4.10 and does contain the bug. I’ll patch my flink client and see if it happens again.
Best regards,
Arnaud

De : LINZ, Arnaud
Envoyé : mercredi 18 novembre 2020 10:35
À : 'Guowei Ma' <gu...@gmail.com>
Cc : 'user' <us...@flink.apache.org>
Objet : RE: Random Task executor shutdown

Hello,

We are wondering whether it is related to https://issues.apache.org/jira/browse/ZOOKEEPER-2775 or not.
What is the version of the shaded zookeeper client in Flink 1.10.0 ?
Best,
Arnaud

De : LINZ, Arnaud
Envoyé : mercredi 18 novembre 2020 09:39
À : 'Guowei Ma' <gu...@gmail.com>>
Cc : user <us...@flink.apache.org>>
Objet : RE: Random Task executor shutdown

Hello,

Actually the log is more complete when the application ends, and it’s a Zookeeper related issue.
I took another log.
Job Manager’s log:

(…)
2020-11-12 14:34:09,798 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 73 from task d0d19968414641878c35c392e2062c59 of job b62ad2008af85add0ff9e6680be0cc42 at container_e38_1604477334666_0733_01_000003 @ XXX (dataPort=33692).
2020-11-12 14:34:15,015 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e38_1604477334666_0733_01_000005 because: The TaskExecutor is shutting down.
2020-11-12 14:34:15,072 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (13/15) (4010b02f9f8094b38f182d1f55b9be4b) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)
    at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-11-12 14:34:15,111 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - Calculating tasks to restart to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_12.
(…)

The shutting down task executor log is:

(…)
2020-11-12 14:34:14,497 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED
2020-11-12 14:34:14,497 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2020-11-12 14:34:14,497 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2020-11-12 14:34:14,879 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient  - Client will use GSSAPI as SASL mechanism.
2020-11-12 14:34:14,879 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181. Will attempt to SASL-authenticate using Login Context section 'Client'
2020-11-12 14:34:14,880 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, initiating session
2020-11-12 14:34:14,881 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, sessionid = 0x175924beaf03acf, negotiated timeout = 60000
2020-11-12 14:34:14,881 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: RECONNECTED
2020-11-12 14:34:14,881 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
2020-11-12 14:34:14,881 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
2020-11-12 14:34:14,886 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x175924beaf03acf for server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Xid out of order. Got Xid 228 with err 0 expected Xid 229 for a packet with details: clientPath:null serverPath:null finished:false header:: 229,101  replyHeader:: 0,0,-4  request:: 347892643788,v{'/flink_prd_XXX_StreamFlink/application_1604477334666_0733/leader/resource_manager_lock,'/flink_XXX_StreamFlink/application_1604477334666_0733/leader/b62ad2008af85add0ff9e6680be0cc42/job_manager_lock},v{},v{}  response:: null
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:823)
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:94)
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
2020-11-12 14:34:14,986 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED
2020-11-12 14:34:14,987 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2020-11-12 14:34:14,987 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2020-11-12 14:34:14,988 ERROR org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl  - Background operation retry gave up
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException.create(KeeperException.java:99)
    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:728)
    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:857)
    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
(…)
020-11-12 14:34:15,077 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Map -> Sink (1/15) (053988ce32e1069ad296fd9835554f96).
2020-11-12 14:34:15,077 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map -> Sink (1/15) (053988ce32e1069ad296fd9835554f96) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: Closing task slot table
    at org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl.closeAsync(TaskSlotTableImpl.java:160)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:375)
    at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
(…)
Then plenty of other errors.

Currently I have : high-availability.zookeeper.client.session-timeout: 300000
But between the first error message:
2020-11-12 14:33:43,548 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient  - Client will use GSSAPI as SASL mechanism.
2020-11-12 14:33:43,563 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181. Will attempt to SASL-authenticate using Login Context section 'Client'
2020-11-12 14:33:43,564 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, initiating session
2020-11-12 14:33:43,566 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, sessionid = 0x175924beaf03acf, negotiated timeout = 60000
2020-11-12 14:33:43,566 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: RECONNECTED
2020-11-12 14:33:43,569 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
2020-11-12 14:33:43,570 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
2020-11-12 14:33:43,592 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x175924beaf03acf for server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Xid out of order. Got Xid 37 with err 0 expected Xid 38 for a packet with details: clientPath:null serverPath:null finished:false header:: 38,101  replyHeader:: 0,0,-4  request:: 347892643788,v{'/flink_prd_XXX_StreamFlink/application_1604477334666_0733/leader/resource_manager_lock,'/flink_prd_XXX_StreamFlink/application_1604477334666_0733/leader/b62ad2008af85add0ff9e6680be0cc42/job_manager_lock},v{},v{}  response:: null
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:823)
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:94)
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)

and the last one, there is only one minute. Is there other parameters to adjust to make the Zookeeper synchronization more robust when the network is slowed down ?

Best,
Arnaud


De : Guowei Ma <gu...@gmail.com>>
Envoyé : mardi 17 novembre 2020 00:49
À : LINZ, Arnaud <AL...@bouyguestelecom.fr>>
Cc : user <us...@flink.apache.org>>
Objet : Re: Random Task executor shutdown

Hi, Arnaud
Would you like to share the log of the shutdown task executor?
BTW could you check the gc log of the task executor?
Best,
Guowei


On Mon, Nov 16, 2020 at 8:57 PM LINZ, Arnaud <AL...@bouyguestelecom.fr>> wrote:
(reposted with proper subject line -- sorry for the copy/paste)
-----Original message-----
Hello,

I'm running Flink 1.10 on a yarn cluster. I have a streaming application, that, when under heavy load, fails from time to time with this unique error message in the whole yarn log:

(...)
2020-11-15 16:18:42,202 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 63 from task 4cbc940112a596db54568b24f9209aac of job 1e1717d19bd8ea296314077e42e1c7e5 at container_e38_1604477334666_0960_01_000004 @ xxx (dataPort=33099).
2020-11-15 16:18:55,043 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e38_1604477334666_0960_01_000004 because: The TaskExecutor is shutting down.
2020-11-15 16:18:55,087 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (7/15) (c8e92cacddcd4e41f51a2433d07d2153) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.

      at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)
        at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)
        at akka.japi.pf<http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf<http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf<http://akka.japi.pf>.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-11-15 16:18:55,092 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - Calculating tasks to restart to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_6.
2020-11-15 16:18:55,101 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - 230 tasks should be restarted to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_6.
(...)

What could be the cause of this failure? Why is there no other error message?

I've tried to increase the value of heartbeat.timeout, thinking that maybe it was due to a slow responding mapper, but it did not solve the issue.

Best regards,
Arnaud

________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

RE: Random Task executor shutdown

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hello,

We are wondering whether it is related to https://issues.apache.org/jira/browse/ZOOKEEPER-2775 or not.
What is the version of the shaded zookeeper client in Flink 1.10.0 ?
Best,
Arnaud

De : LINZ, Arnaud
Envoyé : mercredi 18 novembre 2020 09:39
À : 'Guowei Ma' <gu...@gmail.com>
Cc : user <us...@flink.apache.org>
Objet : RE: Random Task executor shutdown

Hello,

Actually the log is more complete when the application ends, and it’s a Zookeeper related issue.
I took another log.
Job Manager’s log:

(…)
2020-11-12 14:34:09,798 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 73 from task d0d19968414641878c35c392e2062c59 of job b62ad2008af85add0ff9e6680be0cc42 at container_e38_1604477334666_0733_01_000003 @ XXX (dataPort=33692).
2020-11-12 14:34:15,015 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e38_1604477334666_0733_01_000005 because: The TaskExecutor is shutting down.
2020-11-12 14:34:15,072 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (13/15) (4010b02f9f8094b38f182d1f55b9be4b) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)
    at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-11-12 14:34:15,111 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - Calculating tasks to restart to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_12.
(…)

The shutting down task executor log is:

(…)
2020-11-12 14:34:14,497 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED
2020-11-12 14:34:14,497 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2020-11-12 14:34:14,497 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2020-11-12 14:34:14,879 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient  - Client will use GSSAPI as SASL mechanism.
2020-11-12 14:34:14,879 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181. Will attempt to SASL-authenticate using Login Context section 'Client'
2020-11-12 14:34:14,880 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, initiating session
2020-11-12 14:34:14,881 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, sessionid = 0x175924beaf03acf, negotiated timeout = 60000
2020-11-12 14:34:14,881 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: RECONNECTED
2020-11-12 14:34:14,881 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
2020-11-12 14:34:14,881 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
2020-11-12 14:34:14,886 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x175924beaf03acf for server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Xid out of order. Got Xid 228 with err 0 expected Xid 229 for a packet with details: clientPath:null serverPath:null finished:false header:: 229,101  replyHeader:: 0,0,-4  request:: 347892643788,v{'/flink_prd_XXX_StreamFlink/application_1604477334666_0733/leader/resource_manager_lock,'/flink_XXX_StreamFlink/application_1604477334666_0733/leader/b62ad2008af85add0ff9e6680be0cc42/job_manager_lock},v{},v{}  response:: null
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:823)
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:94)
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
2020-11-12 14:34:14,986 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED
2020-11-12 14:34:14,987 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2020-11-12 14:34:14,987 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2020-11-12 14:34:14,988 ERROR org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl  - Background operation retry gave up
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException.create(KeeperException.java:99)
    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:728)
    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:857)
    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
(…)
020-11-12 14:34:15,077 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Map -> Sink (1/15) (053988ce32e1069ad296fd9835554f96).
2020-11-12 14:34:15,077 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map -> Sink (1/15) (053988ce32e1069ad296fd9835554f96) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: Closing task slot table
    at org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl.closeAsync(TaskSlotTableImpl.java:160)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:375)
    at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
(…)
Then plenty of other errors.

Currently I have : high-availability.zookeeper.client.session-timeout: 300000
But between the first error message:
2020-11-12 14:33:43,548 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient  - Client will use GSSAPI as SASL mechanism.
2020-11-12 14:33:43,563 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181. Will attempt to SASL-authenticate using Login Context section 'Client'
2020-11-12 14:33:43,564 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, initiating session
2020-11-12 14:33:43,566 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, sessionid = 0x175924beaf03acf, negotiated timeout = 60000
2020-11-12 14:33:43,566 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: RECONNECTED
2020-11-12 14:33:43,569 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
2020-11-12 14:33:43,570 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
2020-11-12 14:33:43,592 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x175924beaf03acf for server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Xid out of order. Got Xid 37 with err 0 expected Xid 38 for a packet with details: clientPath:null serverPath:null finished:false header:: 38,101  replyHeader:: 0,0,-4  request:: 347892643788,v{'/flink_prd_XXX_StreamFlink/application_1604477334666_0733/leader/resource_manager_lock,'/flink_prd_XXX_StreamFlink/application_1604477334666_0733/leader/b62ad2008af85add0ff9e6680be0cc42/job_manager_lock},v{},v{}  response:: null
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:823)
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:94)
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)

and the last one, there is only one minute. Is there other parameters to adjust to make the Zookeeper synchronization more robust when the network is slowed down ?

Best,
Arnaud


De : Guowei Ma <gu...@gmail.com>>
Envoyé : mardi 17 novembre 2020 00:49
À : LINZ, Arnaud <AL...@bouyguestelecom.fr>>
Cc : user <us...@flink.apache.org>>
Objet : Re: Random Task executor shutdown

Hi, Arnaud
Would you like to share the log of the shutdown task executor?
BTW could you check the gc log of the task executor?
Best,
Guowei


On Mon, Nov 16, 2020 at 8:57 PM LINZ, Arnaud <AL...@bouyguestelecom.fr>> wrote:
(reposted with proper subject line -- sorry for the copy/paste)
-----Original message-----
Hello,

I'm running Flink 1.10 on a yarn cluster. I have a streaming application, that, when under heavy load, fails from time to time with this unique error message in the whole yarn log:

(...)
2020-11-15 16:18:42,202 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 63 from task 4cbc940112a596db54568b24f9209aac of job 1e1717d19bd8ea296314077e42e1c7e5 at container_e38_1604477334666_0960_01_000004 @ xxx (dataPort=33099).
2020-11-15 16:18:55,043 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e38_1604477334666_0960_01_000004 because: The TaskExecutor is shutting down.
2020-11-15 16:18:55,087 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (7/15) (c8e92cacddcd4e41f51a2433d07d2153) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.

      at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)
        at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)
        at akka.japi.pf<http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf<http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf<http://akka.japi.pf>.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-11-15 16:18:55,092 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - Calculating tasks to restart to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_6.
2020-11-15 16:18:55,101 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - 230 tasks should be restarted to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_6.
(...)

What could be the cause of this failure? Why is there no other error message?

I've tried to increase the value of heartbeat.timeout, thinking that maybe it was due to a slow responding mapper, but it did not solve the issue.

Best regards,
Arnaud

________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

RE: Random Task executor shutdown

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hello,

Actually the log is more complete when the application ends, and it’s a Zookeeper related issue.
I took another log.
Job Manager’s log:

(…)
2020-11-12 14:34:09,798 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 73 from task d0d19968414641878c35c392e2062c59 of job b62ad2008af85add0ff9e6680be0cc42 at container_e38_1604477334666_0733_01_000003 @ XXX (dataPort=33692).
2020-11-12 14:34:15,015 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e38_1604477334666_0733_01_000005 because: The TaskExecutor is shutting down.
2020-11-12 14:34:15,072 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (13/15) (4010b02f9f8094b38f182d1f55b9be4b) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)
    at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-11-12 14:34:15,111 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - Calculating tasks to restart to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_12.
(…)

The shutting down task executor log is:

(…)
2020-11-12 14:34:14,497 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED
2020-11-12 14:34:14,497 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2020-11-12 14:34:14,497 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2020-11-12 14:34:14,879 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient  - Client will use GSSAPI as SASL mechanism.
2020-11-12 14:34:14,879 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181. Will attempt to SASL-authenticate using Login Context section 'Client'
2020-11-12 14:34:14,880 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, initiating session
2020-11-12 14:34:14,881 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, sessionid = 0x175924beaf03acf, negotiated timeout = 60000
2020-11-12 14:34:14,881 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: RECONNECTED
2020-11-12 14:34:14,881 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
2020-11-12 14:34:14,881 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
2020-11-12 14:34:14,886 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x175924beaf03acf for server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Xid out of order. Got Xid 228 with err 0 expected Xid 229 for a packet with details: clientPath:null serverPath:null finished:false header:: 229,101  replyHeader:: 0,0,-4  request:: 347892643788,v{'/flink_prd_XXX_StreamFlink/application_1604477334666_0733/leader/resource_manager_lock,'/flink_XXX_StreamFlink/application_1604477334666_0733/leader/b62ad2008af85add0ff9e6680be0cc42/job_manager_lock},v{},v{}  response:: null
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:823)
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:94)
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
2020-11-12 14:34:14,986 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED
2020-11-12 14:34:14,987 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2020-11-12 14:34:14,987 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2020-11-12 14:34:14,988 ERROR org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl  - Background operation retry gave up
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException.create(KeeperException.java:99)
    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:728)
    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:857)
    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
(…)
020-11-12 14:34:15,077 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Map -> Sink (1/15) (053988ce32e1069ad296fd9835554f96).
2020-11-12 14:34:15,077 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map -> Sink (1/15) (053988ce32e1069ad296fd9835554f96) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: Closing task slot table
    at org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl.closeAsync(TaskSlotTableImpl.java:160)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:375)
    at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
(…)
Then plenty of other errors.

Currently I have : high-availability.zookeeper.client.session-timeout: 300000
But between the first error message:
2020-11-12 14:33:43,548 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient  - Client will use GSSAPI as SASL mechanism.
2020-11-12 14:33:43,563 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181. Will attempt to SASL-authenticate using Login Context section 'Client'
2020-11-12 14:33:43,564 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, initiating session
2020-11-12 14:33:43,566 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, sessionid = 0x175924beaf03acf, negotiated timeout = 60000
2020-11-12 14:33:43,566 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: RECONNECTED
2020-11-12 14:33:43,569 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
2020-11-12 14:33:43,570 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
2020-11-12 14:33:43,592 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x175924beaf03acf for server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Xid out of order. Got Xid 37 with err 0 expected Xid 38 for a packet with details: clientPath:null serverPath:null finished:false header:: 38,101  replyHeader:: 0,0,-4  request:: 347892643788,v{'/flink_prd_XXX_StreamFlink/application_1604477334666_0733/leader/resource_manager_lock,'/flink_prd_XXX_StreamFlink/application_1604477334666_0733/leader/b62ad2008af85add0ff9e6680be0cc42/job_manager_lock},v{},v{}  response:: null
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:823)
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:94)
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)

and the last one, there is only one minute. Is there other parameters to adjust to make the Zookeeper synchronization more robust when the network is slowed down ?

Best,
Arnaud


De : Guowei Ma <gu...@gmail.com>
Envoyé : mardi 17 novembre 2020 00:49
À : LINZ, Arnaud <AL...@bouyguestelecom.fr>
Cc : user <us...@flink.apache.org>
Objet : Re: Random Task executor shutdown

Hi, Arnaud
Would you like to share the log of the shutdown task executor?
BTW could you check the gc log of the task executor?
Best,
Guowei


On Mon, Nov 16, 2020 at 8:57 PM LINZ, Arnaud <AL...@bouyguestelecom.fr>> wrote:
(reposted with proper subject line -- sorry for the copy/paste)
-----Original message-----
Hello,

I'm running Flink 1.10 on a yarn cluster. I have a streaming application, that, when under heavy load, fails from time to time with this unique error message in the whole yarn log:

(...)
2020-11-15 16:18:42,202 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 63 from task 4cbc940112a596db54568b24f9209aac of job 1e1717d19bd8ea296314077e42e1c7e5 at container_e38_1604477334666_0960_01_000004 @ xxx (dataPort=33099).
2020-11-15 16:18:55,043 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e38_1604477334666_0960_01_000004 because: The TaskExecutor is shutting down.
2020-11-15 16:18:55,087 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (7/15) (c8e92cacddcd4e41f51a2433d07d2153) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.

      at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)
        at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)
        at akka.japi.pf<http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf<http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf<http://akka.japi.pf>.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-11-15 16:18:55,092 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - Calculating tasks to restart to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_6.
2020-11-15 16:18:55,101 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - 230 tasks should be restarted to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_6.
(...)

What could be the cause of this failure? Why is there no other error message?

I've tried to increase the value of heartbeat.timeout, thinking that maybe it was due to a slow responding mapper, but it did not solve the issue.

Best regards,
Arnaud

________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Re: Random Task executor shutdown

Posted by Guowei Ma <gu...@gmail.com>.
Hi, Arnaud
Would you like to share the log of the shutdown task executor?
BTW could you check the gc log of the task executor?
Best,
Guowei


On Mon, Nov 16, 2020 at 8:57 PM LINZ, Arnaud <AL...@bouyguestelecom.fr>
wrote:

> (reposted with proper subject line -- sorry for the copy/paste)
> -----Original message-----
> Hello,
>
> I'm running Flink 1.10 on a yarn cluster. I have a streaming application,
> that, when under heavy load, fails from time to time with this unique error
> message in the whole yarn log:
>
> (...)
> 2020-11-15 16:18:42,202 WARN
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received
> late message for now expired checkpoint attempt 63 from task
> 4cbc940112a596db54568b24f9209aac of job 1e1717d19bd8ea296314077e42e1c7e5 at
> container_e38_1604477334666_0960_01_000004 @ xxx (dataPort=33099).
> 2020-11-15 16:18:55,043 INFO  org.apache.flink.yarn.YarnResourceManager
>                  - Closing TaskExecutor connection
> container_e38_1604477334666_0960_01_000004 because: The TaskExecutor is
> shutting down.
> 2020-11-15 16:18:55,087 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (7/15)
> (c8e92cacddcd4e41f51a2433d07d2153) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
>
>       at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)
>         at
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>         at akka.japi.pf
> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>         at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2020-11-15 16:18:55,092 INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
> - Calculating tasks to restart to recover the failed task
> 2f6467d98899e64a4721f0a7b6a059a8_6.
> 2020-11-15 16:18:55,101 INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
> - 230 tasks should be restarted to recover the failed task
> 2f6467d98899e64a4721f0a7b6a059a8_6.
> (...)
>
> What could be the cause of this failure? Why is there no other error
> message?
>
> I've tried to increase the value of heartbeat.timeout, thinking that maybe
> it was due to a slow responding mapper, but it did not solve the issue.
>
> Best regards,
> Arnaud
>
> ________________________________
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>