You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2016/05/02 06:20:12 UTC

[jira] [Updated] (KAFKA-3642) Fix NPE from ProcessorStateManager when the changelog topic not exists

     [ https://issues.apache.org/jira/browse/KAFKA-3642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Guozhang Wang updated KAFKA-3642:
---------------------------------
    Reviewer: Guozhang Wang

> Fix NPE from ProcessorStateManager when the changelog topic not exists
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-3642
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3642
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.0.1
>            Reporter: Yuto Kawamura
>            Assignee: Yuto Kawamura
>              Labels: architecture
>             Fix For: 0.10.1.0
>
>
> # Fix NPE from ProcessorStateManager when the changelog topic not exists
> When the following two conditions satisifed, ProcessorStateManager throws NPE:
> - A state configured with logging enabled but the corresponding -changelog topic not exists,
> - zookeeper.connect wasn't supplied in streams config.
> so Streams should,
> - expected that the -changelog topic is not exists and throw much meaningful exception.
> - warn users if there's no -changelog topic prepared but zookeeper.connect wasn't also supplied.
> BTW, I think making zookeeper.connect as mandatory argument should be another option if it doens't hurts.
> {code}
> $ git diff                                                                                      
> diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
> index 34c35b7..c5339f1 100644
> --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
> +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
> @@ -108,7 +108,7 @@ public class WordCountProcessorDemo {
>          Properties props = new Properties();
>          props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
>          props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> -        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
> +        // props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
>          props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
>          props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
>  
> $ ./bin/kafka-topics.sh --zookeeper localhost:2181 --list 2>/dev/null | grep '\-changelog'
> $ ./bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo 
> ...
> [2016-04-30 02:25:04,960] ERROR User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group streams-wordcount-processor failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> java.lang.NullPointerException
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:189)
>         at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116)
>         at org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:64)
>         at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
>         at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:115)
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:582)
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:609)
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:71)
>         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:126)
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:226)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:221)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:430)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:416)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:673)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:652)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:397)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:338)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:191)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:161)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:237)
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:343)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:902)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:864)
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:327)
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:250)
> Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:331)
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:250)
> Caused by: java.lang.NullPointerException
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:189)
>         at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116)
>         at org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:64)
>         at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
>         at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:115)
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:582)
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:609)
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:71)
>         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:126)
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:226)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:221)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:430)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:416)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:673)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:652)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:397)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:338)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:191)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:161)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:237)
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:343)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:902)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:864)
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:327)
>         ... 1 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)