You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ismael Juma (JIRA)" <ji...@apache.org> on 2016/04/13 12:14:25 UTC

[jira] [Commented] (KAFKA-3358) Only request metadata updates once we have topics or a pattern subscription

    [ https://issues.apache.org/jira/browse/KAFKA-3358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15239013#comment-15239013 ] 

Ismael Juma commented on KAFKA-3358:
------------------------------------

Phil Luckhurst mentioned in the mailing list that an additional aspect is that we send metadata requests quite frequently if the producer is started, but not used:

{noformat}
With debug logging turned on we've sometimes seen our logs filling up with the kafka producer sending metadata requests
every 100ms e.g.

2016-04-08 10:39:33,592 DEBUG [kafka-producer-network-thread | phil-pa-1] org.apache.kafka.clients.NetworkClient: Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=249,client_id=phil-pa-1}, body={topics=[phil-pa-1-device-update]}), isInitiatedByNetworkClient, createdTimeMs=1460108373592, sendTimeMs=0) to node 0
2016-04-08 10:39:33,592 DEBUG [kafka-producer-network-thread | phil-pa-1] org.apache.kafka.clients.Metadata: Updated cluster metadata version 248 to Cluster(nodes = [Node(0, ta-eng-kafka2, 9092)], partitions = [Partition(topic = phil-pa-1-device-update, partition = 0, leader = 0, replicas = [0,], isr = [0,]])
2016-04-08 10:39:33,698 DEBUG [kafka-producer-network-thread | phil-pa-1] org.apache.kafka.clients.NetworkClient: Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=250,client_id=phil-pa-1}, body={topics=[phil-pa-1-device-update]}), isInitiatedByNetworkClient, createdTimeMs=1460108373698, sendTimeMs=0) to node 0
2016-04-08 10:39:33,698 DEBUG [kafka-producer-network-thread | phil-pa-1] org.apache.kafka.clients.Metadata: Updated cluster metadata version 249 to Cluster(nodes = [Node(0, ta-eng-kafka2, 9092)], partitions = [Partition(topic = phil-pa-1-device-update, partition = 0, leader = 0, replicas = [0,], isr = [0,]])

These metadata requests continue to be sent every 100ms (retry.backoff.ms) until we stop the process.

This only seems to happen if the KafkaProducer instance is created but not used to publish a message for 5 minutes. After 5
minutes (metadata.max.age.ms) the producer thread sends a metadata request to the server that has an empty topics list and
the server responds with the partition information for *all* topics hosted on the server.

2016-04-11 14:16:39,320 DEBUG [kafka-producer-network-thread | phil-pa-1] org.apache.kafka.clients.NetworkClient: Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=phil-pa-1}, body={topics=[]}), isInitiatedByNetworkClient, createdTimeMs=1460380599289, sendTimeMs=0) to node -1

If we then use that KafkaProducer instance to send a message the next 'Sending meta request' will just be for the topic we have
sent the message to and this then triggers the flood of retry requests as noted above.

If we ensure we send the first message within the time set by metadata.max.age.ms (default 5 minutes) then everything works as
expected and the metadata requests do not continually get retried.

In many cases I can understand that creating a KafkaProducer and then not using it within 5 minutes is not usual but in our case
we're creating it when our REST based application starts up and we can't guarantee that a message will be published within that
time. To get around this we are currently posting a test message to the topic right after creating the KafkaProducer prevents it
happening.
{noformat}

Phil investigated some more and said:

{noformat}
The request does succeed and the reason it keeps requesting is a check in the Sender.run(long now) method.

    public void run(long now) {
        Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (result.unknownLeadersExist)
            this.metadata.requestUpdate();

It looks like the this.accumulator.ready(cluster, now) method checks the leader for each partition in the response against what it
already had. In this case the original metadata request had the empty topic list so got information for all partitions but after
using the producer the cluster only has the one topic in it which means this check sets unknownLeadersExist = true.
            Node leader = cluster.leaderFor(part);
            if (leader == null) {
                unknownLeadersExist = true;

As you can see above the Sender.run method checks for this in the result and then calls this.metadata.requestUpdate() which
triggers the metadata to be requested again. And of course the same thing happens when checking the next response and
we're suddenly in the loop forever.
{noformat}

Supposedly, if we only update the cluster metadata and don't ask for any topics, this may also resolve itself, but it needs to be verified.

> Only request metadata updates once we have topics or a pattern subscription
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-3358
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3358
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 0.9.0.0, 0.9.0.1
>            Reporter: Ismael Juma
>            Assignee: Jason Gustafson
>            Priority: Critical
>             Fix For: 0.10.1.0
>
>
> The current code requests a metadata update for _all_ topics which can cause major load issues in large clusters.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)