You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2018/02/01 16:58:50 UTC

[kafka] branch trunk updated: KAFKA-6489; Fetcher.retrieveOffsetsByTimes() should batch the metadata fetch.

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7804ea1  KAFKA-6489; Fetcher.retrieveOffsetsByTimes() should batch the metadata fetch.
7804ea1 is described below

commit 7804ea173bdbb0f401ad0135442c563fb52f895c
Author: Jiangjie Qin <be...@gmail.com>
AuthorDate: Thu Feb 1 08:58:29 2018 -0800

    KAFKA-6489; Fetcher.retrieveOffsetsByTimes() should batch the metadata fetch.
    
    Currently if users call KafkaConsumer.offsetsForTimes() with a large set of partitions. The consumer will add one topic at a time for the metadata refresh. We should add all the topics to the metadata topics and just do one metadata refresh instead.
    
    Author: Jiangjie Qin <be...@gmail.com>
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
    
    Closes #4478 from becketqin/KAFKA-6849
---
 .../kafka/clients/consumer/internals/Fetcher.java  |  5 +-
 .../java/org/apache/kafka/clients/MockClient.java  | 18 +++++++-
 .../clients/consumer/internals/FetcherTest.java    | 53 +++++++++++++---------
 3 files changed, 52 insertions(+), 24 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 5dc0b26..6d56139 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -604,11 +604,14 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             final Map<TopicPartition, Long> timestampsToSearch) {
         // Group the partitions by node.
         final Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = new HashMap<>();
+        // Add the topics to the metadata to do a single metadata fetch.
+        for (TopicPartition tp : timestampsToSearch.keySet())
+            metadata.add(tp.topic());
+
         for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
             TopicPartition tp  = entry.getKey();
             PartitionInfo info = metadata.fetch().partition(tp);
             if (info == null) {
-                metadata.add(tp.topic());
                 log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", tp);
                 return RequestFuture.staleMetadata();
             } else if (info.leader() == null) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 8b33472..d843414 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -198,6 +198,12 @@ public class MockClient implements KafkaClient {
             if (metadataUpdate == null)
                 metadata.update(metadata.fetch(), this.unavailableTopics, time.milliseconds());
             else {
+                if (metadataUpdate.expectMatchRefreshTopics
+                    && !metadata.topics().equals(metadataUpdate.cluster.topics())) {
+                    throw new IllegalStateException("The metadata topics does not match expectation. "
+                                                        + "Expected topics: " + metadataUpdate.cluster.topics()
+                                                        + ", asked topics: " + metadata.topics());
+                }
                 this.unavailableTopics = metadataUpdate.unavailableTopics;
                 metadata.update(metadataUpdate.cluster, metadataUpdate.unavailableTopics, time.milliseconds());
             }
@@ -344,7 +350,13 @@ public class MockClient implements KafkaClient {
     }
 
     public void prepareMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
-        metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics));
+        metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics, false));
+    }
+
+    public void prepareMetadataUpdate(Cluster cluster,
+                                      Set<String> unavailableTopics,
+                                      boolean expectMatchMetadataTopics) {
+        metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics, expectMatchMetadataTopics));
     }
 
     public void setNode(Node node) {
@@ -433,9 +445,11 @@ public class MockClient implements KafkaClient {
     private static class MetadataUpdate {
         final Cluster cluster;
         final Set<String> unavailableTopics;
-        MetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
+        final boolean expectMatchRefreshTopics;
+        MetadataUpdate(Cluster cluster, Set<String> unavailableTopics, boolean expectMatchRefreshTopics) {
             this.cluster = cluster;
             this.unavailableTopics = unavailableTopics;
+            this.expectMatchRefreshTopics = expectMatchRefreshTopics;
         }
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 26d7a50..a3ea793 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NetworkClient;
@@ -1942,41 +1943,51 @@ public class FetcherTest {
         return 1;
     }
 
-    private void testGetOffsetsForTimesWithError(Errors errorForTp0,
-                                                 Errors errorForTp1,
-                                                 long offsetForTp0,
-                                                 long offsetForTp1,
-                                                 Long expectedOffsetForTp0,
-                                                 Long expectedOffsetForTp1) {
+    private void testGetOffsetsForTimesWithError(Errors errorForP0,
+                                                 Errors errorForP1,
+                                                 long offsetForP0,
+                                                 long offsetForP1,
+                                                 Long expectedOffsetForP0,
+                                                 Long expectedOffsetForP1) {
         client.reset();
-        // Ensure metadata has both partition.
-        Cluster cluster = TestUtils.clusterWith(2, topicName, 2);
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        String topicName2 = "topic2";
+        TopicPartition t2p0 = new TopicPartition(topicName2, 0);
+        // Expect a metadata refresh.
+        metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"))),
+                        Collections.<String>emptySet(),
+                        time.milliseconds());
+
+        Map<String, Integer> partitionNumByTopic = new HashMap<>();
+        partitionNumByTopic.put(topicName, 2);
+        partitionNumByTopic.put(topicName2, 1);
+        cluster = TestUtils.clusterWith(2, partitionNumByTopic);
+        // The metadata refresh should contain all the topics.
+        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet(), true);
 
         // First try should fail due to metadata error.
-        client.prepareResponseFrom(listOffsetResponse(tp0, errorForTp0, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));
-        client.prepareResponseFrom(listOffsetResponse(tp1, errorForTp1, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1));
+        client.prepareResponseFrom(listOffsetResponse(t2p0, errorForP0, offsetForP0, offsetForP0), cluster.leaderFor(t2p0));
+        client.prepareResponseFrom(listOffsetResponse(tp1, errorForP1, offsetForP1, offsetForP1), cluster.leaderFor(tp1));
         // Second try should succeed.
-        client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));
-        client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1));
+        client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, offsetForP0, offsetForP0), cluster.leaderFor(t2p0));
+        client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, offsetForP1, offsetForP1), cluster.leaderFor(tp1));
 
         Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
-        timestampToSearch.put(tp0, 0L);
+        timestampToSearch.put(t2p0, 0L);
         timestampToSearch.put(tp1, 0L);
         Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = fetcher.getOffsetsByTimes(timestampToSearch, Long.MAX_VALUE);
 
-        if (expectedOffsetForTp0 == null)
-            assertNull(offsetAndTimestampMap.get(tp0));
+        if (expectedOffsetForP0 == null)
+            assertNull(offsetAndTimestampMap.get(t2p0));
         else {
-            assertEquals(expectedOffsetForTp0.longValue(), offsetAndTimestampMap.get(tp0).timestamp());
-            assertEquals(expectedOffsetForTp0.longValue(), offsetAndTimestampMap.get(tp0).offset());
+            assertEquals(expectedOffsetForP0.longValue(), offsetAndTimestampMap.get(t2p0).timestamp());
+            assertEquals(expectedOffsetForP0.longValue(), offsetAndTimestampMap.get(t2p0).offset());
         }
 
-        if (expectedOffsetForTp1 == null)
+        if (expectedOffsetForP1 == null)
             assertNull(offsetAndTimestampMap.get(tp1));
         else {
-            assertEquals(expectedOffsetForTp1.longValue(), offsetAndTimestampMap.get(tp1).timestamp());
-            assertEquals(expectedOffsetForTp1.longValue(), offsetAndTimestampMap.get(tp1).offset());
+            assertEquals(expectedOffsetForP1.longValue(), offsetAndTimestampMap.get(tp1).timestamp());
+            assertEquals(expectedOffsetForP1.longValue(), offsetAndTimestampMap.get(tp1).offset());
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
jqin@apache.org.