You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/02/01 17:01:00 UTC

[jira] [Commented] (KAFKA-6489) Fetcher.retrieveOffsetsByTimes() should add all the topics to the metadata refresh topics set.

    [ https://issues.apache.org/jira/browse/KAFKA-6489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348905#comment-16348905 ] 

ASF GitHub Bot commented on KAFKA-6489:
---------------------------------------

becketqin closed pull request #4478: KAFKA-6489; Fetcher.retrieveOffsetsByTimes() should batch the metadata fetch.
URL: https://github.com/apache/kafka/pull/4478
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 5dc0b26f8fb..6d56139118a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -604,11 +604,14 @@ private long endTimestamp() {
             final Map<TopicPartition, Long> timestampsToSearch) {
         // Group the partitions by node.
         final Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = new HashMap<>();
+        // Add the topics to the metadata to do a single metadata fetch.
+        for (TopicPartition tp : timestampsToSearch.keySet())
+            metadata.add(tp.topic());
+
         for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
             TopicPartition tp  = entry.getKey();
             PartitionInfo info = metadata.fetch().partition(tp);
             if (info == null) {
-                metadata.add(tp.topic());
                 log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", tp);
                 return RequestFuture.staleMetadata();
             } else if (info.leader() == null) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 8b334729247..d843414fd7a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -198,6 +198,12 @@ public void send(ClientRequest request, long now) {
             if (metadataUpdate == null)
                 metadata.update(metadata.fetch(), this.unavailableTopics, time.milliseconds());
             else {
+                if (metadataUpdate.expectMatchRefreshTopics
+                    && !metadata.topics().equals(metadataUpdate.cluster.topics())) {
+                    throw new IllegalStateException("The metadata topics does not match expectation. "
+                                                        + "Expected topics: " + metadataUpdate.cluster.topics()
+                                                        + ", asked topics: " + metadata.topics());
+                }
                 this.unavailableTopics = metadataUpdate.unavailableTopics;
                 metadata.update(metadataUpdate.cluster, metadataUpdate.unavailableTopics, time.milliseconds());
             }
@@ -344,7 +350,13 @@ public void reset() {
     }
 
     public void prepareMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
-        metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics));
+        metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics, false));
+    }
+
+    public void prepareMetadataUpdate(Cluster cluster,
+                                      Set<String> unavailableTopics,
+                                      boolean expectMatchMetadataTopics) {
+        metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics, expectMatchMetadataTopics));
     }
 
     public void setNode(Node node) {
@@ -433,9 +445,11 @@ public void setNodeApiVersions(NodeApiVersions nodeApiVersions) {
     private static class MetadataUpdate {
         final Cluster cluster;
         final Set<String> unavailableTopics;
-        MetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
+        final boolean expectMatchRefreshTopics;
+        MetadataUpdate(Cluster cluster, Set<String> unavailableTopics, boolean expectMatchRefreshTopics) {
             this.cluster = cluster;
             this.unavailableTopics = unavailableTopics;
+            this.expectMatchRefreshTopics = expectMatchRefreshTopics;
         }
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 26d7a50cf60..a3ea79356f9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NetworkClient;
@@ -1942,41 +1943,51 @@ private int abortTransaction(ByteBuffer buffer, long producerId, long baseOffset
         return 1;
     }
 
-    private void testGetOffsetsForTimesWithError(Errors errorForTp0,
-                                                 Errors errorForTp1,
-                                                 long offsetForTp0,
-                                                 long offsetForTp1,
-                                                 Long expectedOffsetForTp0,
-                                                 Long expectedOffsetForTp1) {
+    private void testGetOffsetsForTimesWithError(Errors errorForP0,
+                                                 Errors errorForP1,
+                                                 long offsetForP0,
+                                                 long offsetForP1,
+                                                 Long expectedOffsetForP0,
+                                                 Long expectedOffsetForP1) {
         client.reset();
-        // Ensure metadata has both partition.
-        Cluster cluster = TestUtils.clusterWith(2, topicName, 2);
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        String topicName2 = "topic2";
+        TopicPartition t2p0 = new TopicPartition(topicName2, 0);
+        // Expect a metadata refresh.
+        metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"))),
+                        Collections.<String>emptySet(),
+                        time.milliseconds());
+
+        Map<String, Integer> partitionNumByTopic = new HashMap<>();
+        partitionNumByTopic.put(topicName, 2);
+        partitionNumByTopic.put(topicName2, 1);
+        cluster = TestUtils.clusterWith(2, partitionNumByTopic);
+        // The metadata refresh should contain all the topics.
+        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet(), true);
 
         // First try should fail due to metadata error.
-        client.prepareResponseFrom(listOffsetResponse(tp0, errorForTp0, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));
-        client.prepareResponseFrom(listOffsetResponse(tp1, errorForTp1, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1));
+        client.prepareResponseFrom(listOffsetResponse(t2p0, errorForP0, offsetForP0, offsetForP0), cluster.leaderFor(t2p0));
+        client.prepareResponseFrom(listOffsetResponse(tp1, errorForP1, offsetForP1, offsetForP1), cluster.leaderFor(tp1));
         // Second try should succeed.
-        client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));
-        client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1));
+        client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, offsetForP0, offsetForP0), cluster.leaderFor(t2p0));
+        client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, offsetForP1, offsetForP1), cluster.leaderFor(tp1));
 
         Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
-        timestampToSearch.put(tp0, 0L);
+        timestampToSearch.put(t2p0, 0L);
         timestampToSearch.put(tp1, 0L);
         Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = fetcher.getOffsetsByTimes(timestampToSearch, Long.MAX_VALUE);
 
-        if (expectedOffsetForTp0 == null)
-            assertNull(offsetAndTimestampMap.get(tp0));
+        if (expectedOffsetForP0 == null)
+            assertNull(offsetAndTimestampMap.get(t2p0));
         else {
-            assertEquals(expectedOffsetForTp0.longValue(), offsetAndTimestampMap.get(tp0).timestamp());
-            assertEquals(expectedOffsetForTp0.longValue(), offsetAndTimestampMap.get(tp0).offset());
+            assertEquals(expectedOffsetForP0.longValue(), offsetAndTimestampMap.get(t2p0).timestamp());
+            assertEquals(expectedOffsetForP0.longValue(), offsetAndTimestampMap.get(t2p0).offset());
         }
 
-        if (expectedOffsetForTp1 == null)
+        if (expectedOffsetForP1 == null)
             assertNull(offsetAndTimestampMap.get(tp1));
         else {
-            assertEquals(expectedOffsetForTp1.longValue(), offsetAndTimestampMap.get(tp1).timestamp());
-            assertEquals(expectedOffsetForTp1.longValue(), offsetAndTimestampMap.get(tp1).offset());
+            assertEquals(expectedOffsetForP1.longValue(), offsetAndTimestampMap.get(tp1).timestamp());
+            assertEquals(expectedOffsetForP1.longValue(), offsetAndTimestampMap.get(tp1).offset());
         }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Fetcher.retrieveOffsetsByTimes() should add all the topics to the metadata refresh topics set.
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6489
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6489
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 1.0.0
>            Reporter: Jiangjie Qin
>            Assignee: Jiangjie Qin
>            Priority: Major
>             Fix For: 1.2.0
>
>
> Currently if users call KafkaConsumer.offsetsForTimes() with a large set of partitions. The consumer will add one topic at a time for the metadata refresh. We should add all the topics to the metadata topics and just do one metadata refresh.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)