You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Zhenzhong Xu (JIRA)" <ji...@apache.org> on 2017/08/09 23:50:00 UTC

[jira] [Commented] (FLINK-7278) Flink job can stuck while ZK leader reelected during ZK cluster migration

    [ https://issues.apache.org/jira/browse/FLINK-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120845#comment-16120845 ] 

Zhenzhong Xu commented on FLINK-7278:
-------------------------------------

[~trohrmann@apache.org] unfortunately, we don't have logs for this instance any more. However, I think there is the only occurence we have observed. Let me know if there is any information I can help with.

> Flink job can stuck while ZK leader reelected during ZK cluster migration 
> --------------------------------------------------------------------------
>
>                 Key: FLINK-7278
>                 URL: https://issues.apache.org/jira/browse/FLINK-7278
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>            Reporter: Zhenzhong Xu
>            Priority: Minor
>
> We have observed an potential failure case while Flink job was running during ZK migration. Below describes the scenario.
> 1. Flink cluster running with standalone mode on Netfilx Titus container runtime 
> 2. We performed a ZK migration by updating new OS image one node at a time.
> 3. During ZK leader reelection, Flink cluster starts to exhibit failures and eventually end in a non-recoverable failure mode.
> 4. This behavior does not repro every time, may be caused by an edge race condition.
> Below is a list of error messages ordered by event time:
> 017-07-22 02:47:44,535 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Source -> Sink: Sink (67/176) (0442d63c89809ad86f38874c845ba83f) switched from RUNNING to FAILED.
> java.lang.Exception: TaskManager was lost/killed: ResourceID
> {resourceId='f519795dfabcecfd7863ed587efdb398'}
> @ titus-123072-worker-3-39 (dataPort=46879)
> at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
> at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
> at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
> at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
> at org.apache.flink.runtime.instance.InstanceManager.unregisterAllTaskManagers(InstanceManager.java:234)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:330)
> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2017-07-22 02:47:44,621 WARN com.netflix.spaas.runtime.FlinkJobManager - Discard message LeaderSessionMessage(7a247ad9-531b-4f27-877b-df41f9019431,Disconnect(0b300c04592b19750678259cd09fea95,java.lang.Exception: TaskManager akka://flink/user/taskmanager is disassociating)) because the expected leader session ID None did not equal the received leader session ID Some(7a247ad9-531b-4f27-877b-df41f9019431).
> Permalink Edit Delete 
> zxu Zhenzhong Xu added a comment - 07/26/2017 09:24 PM
> 2017-07-22 02:47:45,015 WARN netflix.spaas.shaded.org.apache.zookeeper.ClientCnxn - Session 0x2579bebfd265054 for server 100.83.64.121/100.83.64.121:2181, unexpected error, closing socket connection and attempting reconnect
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at netflix.spaas.shaded.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
> at netflix.spaas.shaded.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
> at netflix.spaas.shaded.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> Permalink Edit Delete 
> zxu Zhenzhong Xu added a comment - 07/26/2017 09:25 PM
> 2017-07-22 02:47:44,557 ERROR org.apache.kafka.clients.producer.KafkaProducer - Interrupted while joining ioThread
> java.lang.InterruptedException
> at java.lang.Object.wait(Native Method)
> at java.lang.Thread.join(Thread.java:1260)
> at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
> at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
> at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:320)
> at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:431)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)
> at java.lang.Thread.run(Thread.java:748)
> 2017-07-22 02:47:44,663 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of stream operator.
> org.apache.kafka.common.KafkaException: Failed to close kafka producer
> at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:732)
> at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
> at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:320)
> at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:431)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.InterruptedException
> at java.lang.Object.wait(Native Method)
> at java.lang.Thread.join(Thread.java:1260)
> at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
> ... 9 more
> 2017-07-22 02:47:45,079 WARN netflix.spaas.shaded.org.apache.zookeeper.ClientCnxn - Session 0x35841491f044692 for server null, unexpected error, closing socket connection and attempting reconnect
> java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at netflix.spaas.shaded.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> at netflix.spaas.shaded.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> 2017-07-22 02:47:59,521 ERROR org.apache.flink.shaded.org.apache.curator.ConnectionState - Connection timed out for connection string (100.83.64.121:2181,100.83.104.81:2181,100.83.135.236:2181,100.83.146.196:2181,100.83.17.206:2181) and timeout (15000) / elapsed (15002)
> org.apache.flink.shaded.org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss
> at org.apache.flink.shaded.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197)
> at org.apache.flink.shaded.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:87)
> at org.apache.flink.shaded.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:115)
> at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:806)
> at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792)
> at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:62)
> at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:257)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
> 2017-07-22 02:48:24,523 ERROR org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl - Background operation retry gave up
> netflix.spaas.shaded.org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss
> at netflix.spaas.shaded.org.apache.zookeeper.KeeperException.create(KeeperException.java:99)
> at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:708)
> at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:826)
> at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792)
> at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:62)
> at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:257)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
> 2017-07-22 02:49:34,592 ERROR org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - Resource manager could not register at JobManager
> akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(akka://flink/), Path(/user/jobmanager)]] after [10000 ms]
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
> at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
> at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
> at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)