You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "mayi_hetu (JIRA)" <ji...@apache.org> on 2016/12/14 11:15:58 UTC
[jira] [Commented] (KAFKA-4536) Kafka clients throw
NullPointerException on poll when delete the relative topic
[ https://issues.apache.org/jira/browse/KAFKA-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748056#comment-15748056 ]
mayi_hetu commented on KAFKA-4536:
----------------------------------
As I followed the code, I found there is a MetadataListener in the {color:red}ConsumerCoordinator{color}.
In the {color:red}onMetadataUpdate{color} method . it will *changeSubscription* and *setTopics* to the metadata.
*subscriptions.changeSubscription(topicsToSubscribe);*
*metadata.setTopics(subscriptions.groupSubscription());*
In the changeSubscription method it will only addAll topicsToSubscribe to groupSubscription.
*this.subscription.clear();*
*this.subscription.addAll(topicsToSubscribe);*
*this.groupSubscription.addAll(topicsToSubscribe);*
So at org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:271)
*partitionInfos.addAll(cluster.partitionsForTopic(topic));*
{color:red}the topics still have the delete topic but the cluste cannot get partitionsForTopic for it {color}, and the cluster.partitionsForTopic(topic) method will return null.
> Kafka clients throw NullPointerException on poll when delete the relative topic
> -------------------------------------------------------------------------------
>
> Key: KAFKA-4536
> URL: https://issues.apache.org/jira/browse/KAFKA-4536
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 0.10.0.1
> Reporter: mayi_hetu
>
> 1. new KafkaConsumer
> val groupIdString = "test1"
> val props = new Properties();
> props.put("bootstrap.servers", "99.12.143.240:9093");
> props.put("group.id", groupIdString);
> props.put("enable.auto.commit", "false");
> props.put("auto.offset.reset","earliest");
> props.put("auto.commit.interval.ms", "5000");
> props.put("metadata.max.age.ms","300000");
> props.put("session.timeout.ms", "30000");
> props.setProperty("key.deserializer", classOf[ByteArrayDeserializer].getName)
> props.setProperty("value.deserializer", classOf[ByteArrayDeserializer].getName)
> props.setProperty("client.id", groupIdString)
> new KafkaConsumer[Array[Byte], Array[Byte]](props)
> 2. *subscribe topic through Partten*
> consumer.subscribe(Pattern.compile(".*\\.sh$"), consumerRebalanceListener)
> 3. use poll(1000) fetching messages
> 4. delete topic test1.sh in Kafka broker
> then the consumer throw NullPointerException
> {color:red}
> Exception in thread "main" java.lang.NullPointerException
> at java.util.ArrayList.addAll(Unknown Source)
> at org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:271)
> at org.apache.kafka.clients.Metadata.update(Metadata.java:185)
> at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:606)
> at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:583)
> at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
> at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998)
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> at TestNewConsumer$MyMirrorMakerNewConsumer.receive(TestNewConsumer.scala:146)
> at TestNewConsumer$.main(TestNewConsumer.scala:38)
> at TestNewConsumer.main(TestNewConsumer.scala)
> {color}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)