You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2016/05/24 21:31:12 UTC

[jira] [Commented] (KAFKA-3752) Provide a way for KStreams to recover from unclean shutdown

    [ https://issues.apache.org/jira/browse/KAFKA-3752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15298949#comment-15298949 ] 

Guozhang Wang commented on KAFKA-3752:
--------------------------------------

We are adding locks to the directory only for the case where there are multiple stream threads on the same machine, when one thread is accessing (hence owning) this directory and the other is cleaning it. We should revisit this issue and see if there is a better solution than using locks here.

> Provide a way for KStreams to recover from unclean shutdown
> -----------------------------------------------------------
>
>                 Key: KAFKA-3752
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3752
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.0.0
>            Reporter: Roger Hoover
>            Assignee: Guozhang Wang
>              Labels: architecture
>
> If a KStream application gets killed with SIGKILL (e.g. by the Linux OOM Killer), it may leave behind lock files and fail to recover.
> It would be useful to have an options (say --force) to tell KStreams to proceed even if it finds old LOCK files.
> {noformat}
> [2016-05-24 17:37:52,886] ERROR Failed to create an active task #0_0 in thread [StreamThread-1]:  (org.apache.kafka.streams.processor.internals.StreamThread:583)
> org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager
> 	at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
> 	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
> 	at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
> 	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> 	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> 	at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
> 	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> 	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> 	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> 	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> 	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
> 	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
> Caused by: java.io.IOException: Failed to lock the state directory: /data/test/2/kafka-streams/test-2/0_0
> 	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
> 	at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
> 	... 32 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)