You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Damian Guy (JIRA)" <ji...@apache.org> on 2017/10/04 18:07:00 UTC

[jira] [Resolved] (KAFKA-4890) State directory being deleted when another thread holds the lock

     [ https://issues.apache.org/jira/browse/KAFKA-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Damian Guy resolved KAFKA-4890.
-------------------------------
    Resolution: Duplicate

> State directory being deleted when another thread holds the lock
> ----------------------------------------------------------------
>
>                 Key: KAFKA-4890
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4890
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Damian Guy
>         Attachments: logs2.tar.gz, logs3.tar.gz, logs.tar.gz
>
>
> Looks like a state directory is being cleaned up when another thread already has the lock:
> {code}
> 2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager - task [0_6] Registering state store perGameScoreStore to its state manager
> 2017-03-12 20:40:21 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_6 for task 0_6
> 2017-03-12 20:40:22 [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group fireflyProd failed on partition assignment
> org.apache.kafka.streams.errors.ProcessorStateException: Error while executing put key \x00\x00\x00\x00}\xA2\x9E\x9D\x05\xF6\x95\xAB\x01\x12dayOfGame and value \x00\x00\x00\x00z\x00\x00\x00\x00\x00\x80G@ from store perGameScoreStore
>         at org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:248)
>         at org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:65)
>         at org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:156)
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:230)
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193)
>         at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
>         at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:152)
>         at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39)
>         at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
>         at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>         at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
>         at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
>         at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>         at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>         at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>  at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.rocksdb.RocksDBException: `
>         at org.rocksdb.RocksDB.put(Native Method)
>         at org.rocksdb.RocksDB.put(RocksDB.java:488)
>         at org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:246)
>         ... 27 common frames omitted
> {code}
> Also 
> {code}
> 2017-03-12 20:46:58 [StreamThread-4] INFO  o.a.k.s.p.i.StateDirectory - Deleting obsolete state directory 0_2 for task 0_2
> ...
> 2017-03-12 20:47:02 [StreamThread-2] ERROR o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-2] Failed to commit StandbyTask 0_2 state:
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_2] Failed to flush state store lifetimeScoreStore
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:325)
>         at org.apache.kafka.streams.processor.internals.StandbyTask.commit(StandbyTask.java:94)
>         at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:777)
>         at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:767)
>         at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:739)
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:661)
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while executing flush from store lifetimeScoreStore
>         at org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:346)
>         at org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:337)
>         at org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
>         at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92)
>         at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>         at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:186)
>         at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:112)
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:323)
>         ... 6 common frames omitted
> Caused by: org.rocksdb.RocksDBException: a
>         at org.rocksdb.RocksDB.flush(Native Method)
>         at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
> {code}
> Operating System info
> Distributor ID: Debian
> Description:    Debian GNU/Linux 8.7 (jessie)
> Release:        8.7
> Codename:       jessie
> uname: 3.16.0-4-amd64
> FWIW - i don't see anything obvious and I can't reproduce it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)