You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Márton Balassi <ba...@gmail.com> on 2015/06/01 16:52:01 UTC

ResendUnfulfillableException at the end of failing job

While experimenting in a cluster setting I was experiencing some hardware
failures causing some taskmanagers to be unregistered and as a result also
failing my streaming jobs. In the logs after the taskmanager dies I see
some akka Exceptions. I think they are harmless compared to loosing
taskmanagers, just wanted to report it.

20:26:17,813 WARN  Remoting
     - Tried to associate with unreachable remote address [akka.tcp://
flink@127.0.0.1:56910]. Address is now gated for 5000 ms, all messages to
this address will be delivered to dead letters. Reason: Connecti
on refused: /127.0.0.1:56910
20:26:22,811 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - GroupedActiveDiscretizer -> BasicWindowBuffer ->
GroupedValues-partial -> GroupedValues-total -> Window Flatten ->
FormatCounts -> WriteCounts (10/20) (e691d84be7c1ab95bcab738b743dc299)
switched from C
ANCELING to CANCELED
20:27:20,683 WARN  Remoting
     - Tried to associate with unreachable remote address [akka.tcp://
flink@10.240.251.253:42117]. Address is now gated for 5000 ms, all messages
to this address will be delivered to dead letters. Reason: Con
nection refused: /10.240.251.253:42117
20:29:00,702 WARN  Remoting
     - Tried to associate with unreachable remote address [akka.tcp://
flink@10.240.251.253:42117]. Address is now gated for 5000 ms, all messages
to this address will be delivered to dead letters. Reason: Con
nection refused: /10.240.251.253:42117
20:30:19,682 WARN  akka.remote.ReliableDeliverySupervisor
     - Association with remote system [akka.tcp://flink@10.240.172.202:36898]
has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
20:30:19,779 WARN  Remoting
     - Tried to associate with unreachable remote address [akka.tc
p://flink@10.240.172.202:36898]. Address is now gated for 5000 ms, all
messages to this address will be delivered to dead letters. Reason: The
remote system has quarantined this system. No further associations to the
remote system are possible until this system is restarted.
20:30:19,779 INFO  org.apache.flink.runtime.jobmanager.JobManager
     - Task manager akka.tcp://flink@10.240.172.202:36898/user/taskmanager
terminated.
20:30:19,779 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - GroupedActiveDiscretizer -> BasicWindowBuffer -> GroupByKeyOnly ->
Window Flatten -> GroupAlsoByWindow (19/20)
(e003610224684be03180e4f101c3367a) switched from CANCELING to FAILED
20:30:19,780 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - ReadLines -> Tokenizer -> Init -> ReifyTimestampsAndWindows (20/20)
(e37268a9a671717f1cf9177e9372a861) switched from CANCELING to FAILED
20:30:19,781 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - GroupedActiveDiscretizer -> BasicWindowBuffer -> Sum.PerKey-partial
-> Sum.PerKey-total -> Window Flatten (19/20)
(4eab0b82cfc266c190fc63569644b77e) switched from CANCELING to FAILED
20:30:19,781 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - GroupedActiveDiscretizer -> BasicWindowBuffer -> Sum.PerKey-partial
-> Sum.PerKey-total -> Window Flatten (20/20)
(11656d30edd03a00ffda0f557221e152) switched from CANCELING to FAILED
20:30:19,782 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - GroupedActiveDiscretizer -> BasicWindowBuffer -> GroupByKeyOnly ->
Window Flatten -> GroupAlsoByWindow (20/20)
(8dbde0fe41675032a7052df696c7f67d) switched from CANCELING to FAILED
20:30:19,782 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
     - ReadLines -> Tokenizer -> Init -> ReifyTimestampsAndWindows (10/20)
(794ee1f56dea331b74bb27dd76579917) switched from CANCELING to FAILED
20:30:19,783 INFO  org.apache.flink.runtime.instance.InstanceManager
      - Unregistered task manager akka.tcp://flink@10.240.172.202:36898.
Number of registered task managers 8. Number of available slots 16.
20:30:19,789 WARN  Remoting
     - Tried to associate with unreachable remote address [akka.tcp://
flink@127.0.0.1:56910]. Address is now gated for 5000 ms, all messages to
this address will be delivered to dead letters. Reason: Connection refused:
/127.0.0.1:56910
20:30:27,919 INFO  org.apache.flink.runtime.instance.InstanceManager
      - Registering TaskManager at akka.tcp://
flink@10.240.172.202:36898/user/taskmanager which was marked as dead
earlier because of a heart-beat timeout.
20:30:27,919 INFO  org.apache.flink.runtime.instance.InstanceManager
      - Registered TaskManager at dataflow-benchmark-worker7 (akka.tcp://
flink@10.240.172.202:36898/user/taskmanager) as
56cca34b618e37faa010d46079ff3968. Current number of registered hosts is 9.
20:30:33,080 ERROR Remoting
     - Error encountered while processing system message acknowledgement
[4, 5] ACK[5, {3, 2, 1, 0}]
akka.remote.transport.Transport$InvalidAssociationException: Error
encountered while processing system message acknowledgement [4, 5] ACK[5,
{3, 2, 1, 0}]
Caused by: akka.remote.ResendUnfulfillableException: Unable to fulfill
resend request since negatively acknowledged payload is no longer in
buffer. The resend states between two systems are compromised and cannot be
recovered.
        at akka.remote.AckedSendBuffer.acknowledge(AckedDelivery.scala:103)
        at
akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:288)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at
akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:185)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
20:30:33,085 INFO  org.apache.flink.runtime.jobmanager.JobManager
     - Task manager akka.tcp://flink@10.240.172.202:36898/user/taskmanager
terminated.
20:30:33,086 INFO  org.apache.flink.runtime.instance.InstanceManager
      - Unregistered task manager akka.tcp://flink@10.240.172.202:36898.
Number of registered task managers 8. Number of available slots 16.

Re: ResendUnfulfillableException at the end of failing job

Posted by Ufuk Celebi <uc...@apache.org>.
Thanks for posting this. From what I've found online, this seems to be a problem related to Akka. For example there is this PR to fix this: https://github.com/akka/akka/issues/16623

Can you check whether this should have been fixed in the Akka version we are using and post to the Akka ML if so?

Have you tried reproducing the problem? It's probably not easy/possible...

– Ufuk

On 01 Jun 2015, at 16:52, Márton Balassi <ba...@gmail.com> wrote:

> While experimenting in a cluster setting I was experiencing some hardware
> failures causing some taskmanagers to be unregistered and as a result also
> failing my streaming jobs. In the logs after the taskmanager dies I see
> some akka Exceptions. I think they are harmless compared to loosing
> taskmanagers, just wanted to report it.
> 
> 20:26:17,813 WARN  Remoting
>     - Tried to associate with unreachable remote address [akka.tcp://
> flink@127.0.0.1:56910]. Address is now gated for 5000 ms, all messages to
> this address will be delivered to dead letters. Reason: Connecti
> on refused: /127.0.0.1:56910
> 20:26:22,811 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>     - GroupedActiveDiscretizer -> BasicWindowBuffer ->
> GroupedValues-partial -> GroupedValues-total -> Window Flatten ->
> FormatCounts -> WriteCounts (10/20) (e691d84be7c1ab95bcab738b743dc299)
> switched from C
> ANCELING to CANCELED
> 20:27:20,683 WARN  Remoting
>     - Tried to associate with unreachable remote address [akka.tcp://
> flink@10.240.251.253:42117]. Address is now gated for 5000 ms, all messages
> to this address will be delivered to dead letters. Reason: Con
> nection refused: /10.240.251.253:42117
> 20:29:00,702 WARN  Remoting
>     - Tried to associate with unreachable remote address [akka.tcp://
> flink@10.240.251.253:42117]. Address is now gated for 5000 ms, all messages
> to this address will be delivered to dead letters. Reason: Con
> nection refused: /10.240.251.253:42117
> 20:30:19,682 WARN  akka.remote.ReliableDeliverySupervisor
>     - Association with remote system [akka.tcp://flink@10.240.172.202:36898]
> has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
> 20:30:19,779 WARN  Remoting
>     - Tried to associate with unreachable remote address [akka.tc
> p://flink@10.240.172.202:36898]. Address is now gated for 5000 ms, all
> messages to this address will be delivered to dead letters. Reason: The
> remote system has quarantined this system. No further associations to the
> remote system are possible until this system is restarted.
> 20:30:19,779 INFO  org.apache.flink.runtime.jobmanager.JobManager
>     - Task manager akka.tcp://flink@10.240.172.202:36898/user/taskmanager
> terminated.
> 20:30:19,779 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>     - GroupedActiveDiscretizer -> BasicWindowBuffer -> GroupByKeyOnly ->
> Window Flatten -> GroupAlsoByWindow (19/20)
> (e003610224684be03180e4f101c3367a) switched from CANCELING to FAILED
> 20:30:19,780 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>     - ReadLines -> Tokenizer -> Init -> ReifyTimestampsAndWindows (20/20)
> (e37268a9a671717f1cf9177e9372a861) switched from CANCELING to FAILED
> 20:30:19,781 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>     - GroupedActiveDiscretizer -> BasicWindowBuffer -> Sum.PerKey-partial
> -> Sum.PerKey-total -> Window Flatten (19/20)
> (4eab0b82cfc266c190fc63569644b77e) switched from CANCELING to FAILED
> 20:30:19,781 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>     - GroupedActiveDiscretizer -> BasicWindowBuffer -> Sum.PerKey-partial
> -> Sum.PerKey-total -> Window Flatten (20/20)
> (11656d30edd03a00ffda0f557221e152) switched from CANCELING to FAILED
> 20:30:19,782 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>     - GroupedActiveDiscretizer -> BasicWindowBuffer -> GroupByKeyOnly ->
> Window Flatten -> GroupAlsoByWindow (20/20)
> (8dbde0fe41675032a7052df696c7f67d) switched from CANCELING to FAILED
> 20:30:19,782 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>     - ReadLines -> Tokenizer -> Init -> ReifyTimestampsAndWindows (10/20)
> (794ee1f56dea331b74bb27dd76579917) switched from CANCELING to FAILED
> 20:30:19,783 INFO  org.apache.flink.runtime.instance.InstanceManager
>      - Unregistered task manager akka.tcp://flink@10.240.172.202:36898.
> Number of registered task managers 8. Number of available slots 16.
> 20:30:19,789 WARN  Remoting
>     - Tried to associate with unreachable remote address [akka.tcp://
> flink@127.0.0.1:56910]. Address is now gated for 5000 ms, all messages to
> this address will be delivered to dead letters. Reason: Connection refused:
> /127.0.0.1:56910
> 20:30:27,919 INFO  org.apache.flink.runtime.instance.InstanceManager
>      - Registering TaskManager at akka.tcp://
> flink@10.240.172.202:36898/user/taskmanager which was marked as dead
> earlier because of a heart-beat timeout.
> 20:30:27,919 INFO  org.apache.flink.runtime.instance.InstanceManager
>      - Registered TaskManager at dataflow-benchmark-worker7 (akka.tcp://
> flink@10.240.172.202:36898/user/taskmanager) as
> 56cca34b618e37faa010d46079ff3968. Current number of registered hosts is 9.
> 20:30:33,080 ERROR Remoting
>     - Error encountered while processing system message acknowledgement
> [4, 5] ACK[5, {3, 2, 1, 0}]
> akka.remote.transport.Transport$InvalidAssociationException: Error
> encountered while processing system message acknowledgement [4, 5] ACK[5,
> {3, 2, 1, 0}]
> Caused by: akka.remote.ResendUnfulfillableException: Unable to fulfill
> resend request since negatively acknowledged payload is no longer in
> buffer. The resend states between two systems are compromised and cannot be
> recovered.
>        at akka.remote.AckedSendBuffer.acknowledge(AckedDelivery.scala:103)
>        at
> akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:288)
>        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>        at
> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:185)
>        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>        at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>        at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>        at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>        at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>        at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 20:30:33,085 INFO  org.apache.flink.runtime.jobmanager.JobManager
>     - Task manager akka.tcp://flink@10.240.172.202:36898/user/taskmanager
> terminated.
> 20:30:33,086 INFO  org.apache.flink.runtime.instance.InstanceManager
>      - Unregistered task manager akka.tcp://flink@10.240.172.202:36898.
> Number of registered task managers 8. Number of available slots 16.