You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Greg Fodor (JIRA)" <ji...@apache.org> on 2016/05/26 17:35:12 UTC

[jira] [Created] (KAFKA-3758) KStream job fails to recover after Kafka broker stopped

Greg Fodor created KAFKA-3758:
---------------------------------

             Summary: KStream job fails to recover after Kafka broker stopped
                 Key: KAFKA-3758
                 URL: https://issues.apache.org/jira/browse/KAFKA-3758
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.10.0.0
            Reporter: Greg Fodor
            Assignee: Guozhang Wang


We've been doing some testing of a fairly complex KStreams job and under load it seems the job fails to rebalance + recover if we shut down one of the kafka brokers. The test we were running had a 3-node kafka cluster where each topic had at least a replication factor of 2, and we terminated one of the nodes.

Attached is the full log, the root exception seems to be contention on the lock on the state directory. The job continues to try to recover but throws errors relating to locks over and over. Restarting the job itself resolves the problem.

 1702 org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager
 1703         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
 1704         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
 1705         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
 1706         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
 1707         at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
 1708         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
 1709         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
 1710         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
 1711         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
 1712         at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
 1713         at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
 1714         at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
 1715         at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
 1716         at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
 1717         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
 1718         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
 1719         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
 1720         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
 1721         at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
 1722         at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
 1723         at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
 1724         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
 1725         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
 1726         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
 1727         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
 1728         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
 1729         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
 1730         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
 1731         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
 1732         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
 1733         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
 1734         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
 1735         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
 1736 Caused by: java.io.IOException: Failed to lock the state directory: /muon/state/job-stream_photon_messages-1/2_82
 1737         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
 1738         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
 1739         ... 32 more




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)