You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Roger Hoover (JIRA)" <ji...@apache.org> on 2016/05/24 18:46:12 UTC

[jira] [Created] (KAFKA-3752) Provide a way for KStreams to recover from unclean shutdown

Roger Hoover created KAFKA-3752:
-----------------------------------

             Summary: Provide a way for KStreams to recover from unclean shutdown
                 Key: KAFKA-3752
                 URL: https://issues.apache.org/jira/browse/KAFKA-3752
             Project: Kafka
          Issue Type: Improvement
          Components: streams
    Affects Versions: 0.10.0.0
            Reporter: Roger Hoover
            Assignee: Guozhang Wang


If a KStream application gets killed with SIGKILL (e.g. by the Linux OOM Killer), it may leave behind lock files and fail to recover.

It would be useful to have an options (say --force) to tell KStreams to proceed even if it finds old LOCK files.

{noformat}
[2016-05-24 17:37:52,886] ERROR Failed to create an active task #0_0 in thread [StreamThread-1]:  (org.apache.kafka.streams.processor.internals.StreamThread:583)
org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager
	at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
	at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
	at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
	at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: java.io.IOException: Failed to lock the state directory: /data/test/2/kafka-streams/test-2/0_0
	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
	at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
	... 32 more
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)