You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Marcus Clendenin <ma...@gmail.com> on 2017/08/01 14:40:19 UTC

Kafka Streams Application crashing on Rebalance

Hi All,



I have a kafka streams application that is doing a join between a KTable
and a KStream and it seems that after it starts loading the KTable if I
either restart the application or start a new jar with the same
application-id it starts failing. It looks like when it tries to rejoin the
application-id and do a rebalance of the partitions it throws an error
regarding a null value coming from RocksDB. Any thoughts on where this is
coming from? I am running this inside of a docker container if that affects
anything but the RocksDB folder is mounted as a volume on the host machine.





Stacktrace:



2017-08-01 13:31:50,309 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] Starting

2017-08-01 13:31:50,379 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.k.c.consumer.internals.AbstractCoordinator  - Discovered coordinator
xxxx.com:9092 (id: 2147483535 rack: null) for group test-application-id.

2017-08-01 13:31:50,386 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.k.c.consumer.internals.ConsumerCoordinator  - Revoking previously
assigned partitions [] for group test-application-id

2017-08-01 13:31:50,386 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] at state RUNNING: partitions [] revoked at the beginning
of consumer rebalance.

2017-08-01 13:31:50,387 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED.

2017-08-01 13:31:50,387 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams org.apache.kafka.streams.KafkaStreams
- stream-client [test-application-id-67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c]
State transition from RUNNING to REBALANCING.

2017-08-01 13:31:50,388 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] Updating suspended tasks to contain active tasks []

2017-08-01 13:31:50,388 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] Removing all active tasks []

2017-08-01 13:31:50,388 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] Removing all standby tasks []

2017-08-01 13:31:50,389 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.k.c.consumer.internals.AbstractCoordinator  - (Re-)joining group
test-application-id

2017-08-01 13:31:50,416 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.k.s.p.internals.StreamPartitionAssignor  - stream-thread
[StreamThread-1] Constructed client metadata
{67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c=ClientMetadata{hostInfo=null,
consumers=[test-application-id-67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c-StreamThread-1-consumer-f6ed6af8-0aee-4d2e-92a9-00955f7b3441],
state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member
subscriptions.

2017-08-01 13:31:50,417 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.k.s.p.internals.StreamPartitionAssignor  - stream-thread
[StreamThread-1] Completed validating internal topics in partition assignor

2017-08-01 13:31:50,417 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.k.s.p.internals.StreamPartitionAssignor  - stream-thread
[StreamThread-1] Completed validating internal topics in partition assignor

2017-08-01 13:31:50,419 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.k.s.p.internals.StreamPartitionAssignor  - stream-thread
[StreamThread-1] Assigned tasks to clients as
{67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c=[activeTasks: ([0_0, 0_1, 0_2, 0_3,
0_4, 0_5]) assignedTasks: ([0_0, 0_1, 0_2, 0_3, 0_4, 0_5]) prevActiveTasks:
([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 3.0]}.

2017-08-01 13:31:50,429 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.k.c.consumer.internals.AbstractCoordinator  - Successfully joined group
test-application-id with generation 56

2017-08-01 13:31:50,430 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.k.c.consumer.internals.ConsumerCoordinator  - Setting newly assigned
partitions [stream_topic-0, stream_topic-1, stream_topic-2, stream_topic-3,
stream_topic-4, stream_topic-5, table_topic-4, table_topic-5,
table_topic-0, table_topic-1, table_topic-2, table_topic-3] for group
test-application-id

2017-08-01 13:31:50,430 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] at state PARTITIONS_REVOKED: new partitions
[stream_topic-0, stream_topic-1, stream_topic-2, stream_topic-3,
stream_topic-4, stream_topic-5, table_topic-4, table_topic-5,
table_topic-0, table_topic-1, table_topic-2, table_topic-3] assigned at the
end of consumer rebalance.

2017-08-01 13:31:50,430 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] State transition from PARTITIONS_REVOKED to
ASSIGNING_PARTITIONS.

2017-08-01 13:31:50,431 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams org.apache.kafka.streams.KafkaStreams
- stream-client [test-application-id-67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c]
State transition from REBALANCING to REBALANCING.

2017-08-01 13:31:50,431 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] Creating active task 0_0 with assigned partitions
[stream_topic-0, table_topic-0]

2017-08-01 13:31:50,448 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.kafka.streams.processor.internals.StreamTask  - task [0_0] Initializing
state stores

2017-08-01 13:31:50,743 trackingId=X thread=[StreamThread-1] logType=ERROR

                module=kafka.streams
o.a.k.c.consumer.internals.ConsumerCoordinator  - User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$1 for group
test-application-id failed on partition assignment

java.lang.NullPointerException: null

            at org.rocksdb.RocksDB.put(RocksDB.java:488)

            at
org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)

            at
org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)

            at
org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)

            at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)

            at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)

            at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)

            at
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)

            at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)

            at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)

            at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)

            at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)

            at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)

            at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)

            at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)

            at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)

            at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)

            at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)

            at
org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)

            at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)

            at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)

            at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)

            at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)

            at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)

            at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)

            at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)

            at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)

            at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

2017-08-01 13:31:50,752 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] Shutting down

2017-08-01 13:31:50,753 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
org.apache.kafka.clients.producer.KafkaProducer  - Closing the Kafka
producer with timeoutMillis = 9223372036854775807 ms.

2017-08-01 13:31:50,766 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] Removing all active tasks []

2017-08-01 13:31:50,766 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] Removing all standby tasks []

2017-08-01 13:31:50,766 trackingId=X thread=[StreamThread-1] logType=INFO

                module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] Stream thread shutdown complete

2017-08-01 13:31:50,766 trackingId=X thread=[StreamThread-1] logType=WARN

                module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] Unexpected state transition from ASSIGNING_PARTITIONS to
NOT_RUNNING.

Re: Kafka Streams Application crashing on Rebalance

Posted by Eric Lalonde <er...@autonomic.ai>.
> On Aug 1, 2017, at 10:01 AM, Damian Guy <da...@gmail.com> wrote:
> 
> Hi, Yes the issue is in 0.10.2 also.

Hi,

Any chance of a backport to 0.10.2.x? We have lots and lots of state stores. :)


Re: Kafka Streams Application crashing on Rebalance

Posted by Marcus Clendenin <ma...@gmail.com>.
I'll work on moving to 0.11. Thanks

On Tue, Aug 1, 2017, 1:02 PM Damian Guy <da...@gmail.com> wrote:

> Hi, Yes the issue is in 0.10.2 also.
>
> On Tue, 1 Aug 2017 at 17:37 Eric Lalonde <er...@autonomic.ai> wrote:
>
> >
> > > On Aug 1, 2017, at 8:00 AM, Damian Guy <da...@gmail.com> wrote:
> > >
> > > It is a bug in 0.10.2 or lower. It has been fixed in 0.11 by
> > > https://issues.apache.org/jira/browse/KAFKA-4494
> >
> > Hi Damien, the Affects Version is set to 0.10.1.0 in KAFKA-4494. Is the
> > issue in 0.10.2.0 as well?
>

Re: Kafka Streams Application crashing on Rebalance

Posted by Damian Guy <da...@gmail.com>.
Hi, Yes the issue is in 0.10.2 also.

On Tue, 1 Aug 2017 at 17:37 Eric Lalonde <er...@autonomic.ai> wrote:

>
> > On Aug 1, 2017, at 8:00 AM, Damian Guy <da...@gmail.com> wrote:
> >
> > It is a bug in 0.10.2 or lower. It has been fixed in 0.11 by
> > https://issues.apache.org/jira/browse/KAFKA-4494
>
> Hi Damien, the Affects Version is set to 0.10.1.0 in KAFKA-4494. Is the
> issue in 0.10.2.0 as well?

Re: Kafka Streams Application crashing on Rebalance

Posted by Eric Lalonde <er...@autonomic.ai>.
> On Aug 1, 2017, at 8:00 AM, Damian Guy <da...@gmail.com> wrote:
> 
> It is a bug in 0.10.2 or lower. It has been fixed in 0.11 by
> https://issues.apache.org/jira/browse/KAFKA-4494

Hi Damien, the Affects Version is set to 0.10.1.0 in KAFKA-4494. Is the issue in 0.10.2.0 as well?

Re: Kafka Streams Application crashing on Rebalance

Posted by Damian Guy <da...@gmail.com>.
It is a bug in 0.10.2 or lower. It has been fixed in 0.11 by
https://issues.apache.org/jira/browse/KAFKA-4494

On Tue, 1 Aug 2017 at 15:40 Marcus Clendenin <ma...@gmail.com> wrote:

> Hi All,
>
>
>
> I have a kafka streams application that is doing a join between a KTable
> and a KStream and it seems that after it starts loading the KTable if I
> either restart the application or start a new jar with the same
> application-id it starts failing. It looks like when it tries to rejoin the
> application-id and do a rebalance of the partitions it throws an error
> regarding a null value coming from RocksDB. Any thoughts on where this is
> coming from? I am running this inside of a docker container if that affects
> anything but the RocksDB folder is mounted as a volume on the host machine.
>
>
>
>
>
> Stacktrace:
>
>
>
> 2017-08-01 13:31:50,309 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] Starting
>
> 2017-08-01 13:31:50,379 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.k.c.consumer.internals.AbstractCoordinator  - Discovered coordinator
> xxxx.com:9092 (id: 2147483535 <(214)%20748-3535> rack: null) for group
> test-application-id.
>
> 2017-08-01 13:31:50,386 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.k.c.consumer.internals.ConsumerCoordinator  - Revoking previously
> assigned partitions [] for group test-application-id
>
> 2017-08-01 13:31:50,386 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] at state RUNNING: partitions [] revoked at the beginning
> of consumer rebalance.
>
> 2017-08-01 13:31:50,387 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED.
>
> 2017-08-01 13:31:50,387 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams org.apache.kafka.streams.KafkaStreams
> - stream-client [test-application-id-67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c]
> State transition from RUNNING to REBALANCING.
>
> 2017-08-01 13:31:50,388 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] Updating suspended tasks to contain active tasks []
>
> 2017-08-01 13:31:50,388 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] Removing all active tasks []
>
> 2017-08-01 13:31:50,388 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] Removing all standby tasks []
>
> 2017-08-01 13:31:50,389 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.k.c.consumer.internals.AbstractCoordinator  - (Re-)joining group
> test-application-id
>
> 2017-08-01 13:31:50,416 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.k.s.p.internals.StreamPartitionAssignor  - stream-thread
> [StreamThread-1] Constructed client metadata
> {67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c=ClientMetadata{hostInfo=null,
>
> consumers=[test-application-id-67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c-StreamThread-1-consumer-f6ed6af8-0aee-4d2e-92a9-00955f7b3441],
> state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
> prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member
> subscriptions.
>
> 2017-08-01 13:31:50,417 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.k.s.p.internals.StreamPartitionAssignor  - stream-thread
> [StreamThread-1] Completed validating internal topics in partition assignor
>
> 2017-08-01 13:31:50,417 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.k.s.p.internals.StreamPartitionAssignor  - stream-thread
> [StreamThread-1] Completed validating internal topics in partition assignor
>
> 2017-08-01 13:31:50,419 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.k.s.p.internals.StreamPartitionAssignor  - stream-thread
> [StreamThread-1] Assigned tasks to clients as
> {67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c=[activeTasks: ([0_0, 0_1, 0_2, 0_3,
> 0_4, 0_5]) assignedTasks: ([0_0, 0_1, 0_2, 0_3, 0_4, 0_5]) prevActiveTasks:
> ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 3.0]}.
>
> 2017-08-01 13:31:50,429 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.k.c.consumer.internals.AbstractCoordinator  - Successfully joined group
> test-application-id with generation 56
>
> 2017-08-01 13:31:50,430 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.k.c.consumer.internals.ConsumerCoordinator  - Setting newly assigned
> partitions [stream_topic-0, stream_topic-1, stream_topic-2, stream_topic-3,
> stream_topic-4, stream_topic-5, table_topic-4, table_topic-5,
> table_topic-0, table_topic-1, table_topic-2, table_topic-3] for group
> test-application-id
>
> 2017-08-01 13:31:50,430 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] at state PARTITIONS_REVOKED: new partitions
> [stream_topic-0, stream_topic-1, stream_topic-2, stream_topic-3,
> stream_topic-4, stream_topic-5, table_topic-4, table_topic-5,
> table_topic-0, table_topic-1, table_topic-2, table_topic-3] assigned at the
> end of consumer rebalance.
>
> 2017-08-01 13:31:50,430 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] State transition from PARTITIONS_REVOKED to
> ASSIGNING_PARTITIONS.
>
> 2017-08-01 13:31:50,431 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams org.apache.kafka.streams.KafkaStreams
> - stream-client [test-application-id-67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c]
> State transition from REBALANCING to REBALANCING.
>
> 2017-08-01 13:31:50,431 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] Creating active task 0_0 with assigned partitions
> [stream_topic-0, table_topic-0]
>
> 2017-08-01 13:31:50,448 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamTask  - task [0_0] Initializing
> state stores
>
> 2017-08-01 13:31:50,743 trackingId=X thread=[StreamThread-1] logType=ERROR
>
>                 module=kafka.streams
> o.a.k.c.consumer.internals.ConsumerCoordinator  - User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> test-application-id failed on partition assignment
>
> java.lang.NullPointerException: null
>
>             at org.rocksdb.RocksDB.put(RocksDB.java:488)
>
>             at
>
> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)
>
>             at
>
> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)
>
>             at
>
> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)
>
>             at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)
>
>             at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
>
>             at
>
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
>
>             at
>
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
>
>             at
>
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
>
>             at
>
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>
>             at
>
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
>
>             at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
>
>             at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
>
>             at
>
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
>
>             at
>
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>
>             at
>
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>
>             at
>
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>
>             at
>
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>
>             at
>
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>
>             at
>
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>
>             at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>
>             at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>
>             at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>
>             at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>
>             at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>
>             at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>
>             at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>
>             at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>
> 2017-08-01 13:31:50,752 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] Shutting down
>
> 2017-08-01 13:31:50,753 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> org.apache.kafka.clients.producer.KafkaProducer  - Closing the Kafka
> producer with timeoutMillis = 9223372036854775807 ms.
>
> 2017-08-01 13:31:50,766 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] Removing all active tasks []
>
> 2017-08-01 13:31:50,766 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] Removing all standby tasks []
>
> 2017-08-01 13:31:50,766 trackingId=X thread=[StreamThread-1] logType=INFO
>
>                 module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] Stream thread shutdown complete
>
> 2017-08-01 13:31:50,766 trackingId=X thread=[StreamThread-1] logType=WARN
>
>                 module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] Unexpected state transition from ASSIGNING_PARTITIONS to
> NOT_RUNNING.
>