You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Andres Gomez Ferrer (JIRA)" <ji...@apache.org> on 2017/09/22 07:18:00 UTC

[jira] [Created] (KAFKA-5961) NullPointerException when restore messages with null key.

Andres Gomez Ferrer created KAFKA-5961:
------------------------------------------

             Summary: NullPointerException when restore messages with null key.
                 Key: KAFKA-5961
                 URL: https://issues.apache.org/jira/browse/KAFKA-5961
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.11.0.0, 0.10.2.1
            Reporter: Andres Gomez Ferrer


If you have a kafka streams that use:

{code:java}
stream.table("topicA")
{code}

When the application is running if you send a message with a null key, it works fine. Later, if you restart the application when the restore consumer starts to read the topicA from the beginning, it crashes because doesn't filter the null key.

I know that isn't normal send a null key to a topic that is a table topic, but maybe sometimes can happen .. and I think that kafka streams could protect it self.

This is the stack trace:

{code}
ConsumerCoordinator [ERROR] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group my-cep-app_enricher failed on partition assignment
java.lang.NullPointerException
	at org.rocksdb.RocksDB.put(RocksDB.java:488)
	at org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)
	at org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)
	at org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)
	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)
	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
	at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
	at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
	at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
	at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
	at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
	at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
	at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)