You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "MinsuJo (Jira)" <ji...@apache.org> on 2020/05/25 13:18:00 UTC

[jira] [Created] (KAFKA-10039) A Kafka broker is gracefully shutdown, and incorrect metadata was passed to the Kafka connect client.

MinsuJo created KAFKA-10039:
-------------------------------

             Summary: A Kafka broker is gracefully shutdown, and incorrect metadata was passed to the Kafka connect client.
                 Key: KAFKA-10039
                 URL: https://issues.apache.org/jira/browse/KAFKA-10039
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
    Affects Versions: 2.3.1
            Reporter: MinsuJo


To maintain the server, one of the 20 brokers was shutdown gracefully, but all kafka-sink-connect cluster suddenly died with the following NPE error.

 
{code:java}
// error log: kafka distributed sink connect 

[2020-05-22 15:16:20,433] ERROR [Worker clientId=connect-1, groupId=dc2-log-hyper-connector] Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:253)
java.lang.NullPointerException
    at java.util.Objects.requireNonNull(Objects.java:203)
    at org.apache.kafka.common.Cluster.<init>(Cluster.java:134)
    at org.apache.kafka.common.Cluster.<init>(Cluster.java:89)
    at org.apache.kafka.clients.MetadataCache.computeClusterView(MetadataCache.java:120)
    at org.apache.kafka.clients.MetadataCache.<init>(MetadataCache.java:82)
    at org.apache.kafka.clients.MetadataCache.<init>(MetadataCache.java:58)
    at org.apache.kafka.clients.Metadata.handleMetadataResponse(Metadata.java:325)
    at org.apache.kafka.clients.Metadata.update(Metadata.java:252)
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:1059)
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:845)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:548)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
    at org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.poll(WorkerCoordinator.java:154)
    at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.poll(WorkerGroupMember.java:166)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:355)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:245)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
{code}
 

Replication-factor of all topics was more than 2, there were 50 topics and 200 partitions. So, checking up the error and the Kafka library source code, it seems that the error occurred when the Connect Distributed Herder (client) cached the metadata including the broker_node_id_set and partition_info_set received from the broker.
{code:java}
// source code: org.apache.kafka.common.Cluster (v2.3.1)

// index the partition infos by topic, topic+partition, and node
// note that this code is performance sensitive if there are a large number of partitions so we are careful
// to avoid unnecessary work
Map<TopicPartition, PartitionInfo> tmpPartitionsByTopicPartition = new HashMap<>(partitions.size());
Map<String, List<PartitionInfo>> tmpPartitionsByTopic = new HashMap<>();
for (PartitionInfo p : partitions) {
    tmpPartitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
    List<PartitionInfo> partitionsForTopic = tmpPartitionsByTopic.get(p.topic());
    if (partitionsForTopic == null) {
        partitionsForTopic = new ArrayList<>();
        tmpPartitionsByTopic.put(p.topic(), partitionsForTopic);
    }
    partitionsForTopic.add(p);
    if (p.leader() != null) {
        // The broker guarantees that if a partition has a non-null leader, it is one of the brokers returned
        // in the metadata response
        List<PartitionInfo> partitionsForNode = Objects.requireNonNull(tmpPartitionsByNode.get(p.leader().id()));
        partitionsForNode.add(p);
    }
}
{code}
How can this happen, and how to deal with it in the future? (the Version of Broker and Client is v2.3.1)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)