You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2018/02/01 16:58:50 UTC
[kafka] branch trunk updated: KAFKA-6489;
Fetcher.retrieveOffsetsByTimes() should batch the metadata fetch.
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7804ea1 KAFKA-6489; Fetcher.retrieveOffsetsByTimes() should batch the metadata fetch.
7804ea1 is described below
commit 7804ea173bdbb0f401ad0135442c563fb52f895c
Author: Jiangjie Qin <be...@gmail.com>
AuthorDate: Thu Feb 1 08:58:29 2018 -0800
KAFKA-6489; Fetcher.retrieveOffsetsByTimes() should batch the metadata fetch.
Currently if users call KafkaConsumer.offsetsForTimes() with a large set of partitions. The consumer will add one topic at a time for the metadata refresh. We should add all the topics to the metadata topics and just do one metadata refresh instead.
Author: Jiangjie Qin <be...@gmail.com>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #4478 from becketqin/KAFKA-6849
---
.../kafka/clients/consumer/internals/Fetcher.java | 5 +-
.../java/org/apache/kafka/clients/MockClient.java | 18 +++++++-
.../clients/consumer/internals/FetcherTest.java | 53 +++++++++++++---------
3 files changed, 52 insertions(+), 24 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 5dc0b26..6d56139 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -604,11 +604,14 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
final Map<TopicPartition, Long> timestampsToSearch) {
// Group the partitions by node.
final Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = new HashMap<>();
+ // Add the topics to the metadata to do a single metadata fetch.
+ for (TopicPartition tp : timestampsToSearch.keySet())
+ metadata.add(tp.topic());
+
for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
TopicPartition tp = entry.getKey();
PartitionInfo info = metadata.fetch().partition(tp);
if (info == null) {
- metadata.add(tp.topic());
log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", tp);
return RequestFuture.staleMetadata();
} else if (info.leader() == null) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 8b33472..d843414 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -198,6 +198,12 @@ public class MockClient implements KafkaClient {
if (metadataUpdate == null)
metadata.update(metadata.fetch(), this.unavailableTopics, time.milliseconds());
else {
+ if (metadataUpdate.expectMatchRefreshTopics
+ && !metadata.topics().equals(metadataUpdate.cluster.topics())) {
+ throw new IllegalStateException("The metadata topics does not match expectation. "
+ + "Expected topics: " + metadataUpdate.cluster.topics()
+ + ", asked topics: " + metadata.topics());
+ }
this.unavailableTopics = metadataUpdate.unavailableTopics;
metadata.update(metadataUpdate.cluster, metadataUpdate.unavailableTopics, time.milliseconds());
}
@@ -344,7 +350,13 @@ public class MockClient implements KafkaClient {
}
public void prepareMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
- metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics));
+ metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics, false));
+ }
+
+ public void prepareMetadataUpdate(Cluster cluster,
+ Set<String> unavailableTopics,
+ boolean expectMatchMetadataTopics) {
+ metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics, expectMatchMetadataTopics));
}
public void setNode(Node node) {
@@ -433,9 +445,11 @@ public class MockClient implements KafkaClient {
private static class MetadataUpdate {
final Cluster cluster;
final Set<String> unavailableTopics;
- MetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
+ final boolean expectMatchRefreshTopics;
+ MetadataUpdate(Cluster cluster, Set<String> unavailableTopics, boolean expectMatchRefreshTopics) {
this.cluster = cluster;
this.unavailableTopics = unavailableTopics;
+ this.expectMatchRefreshTopics = expectMatchRefreshTopics;
}
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 26d7a50..a3ea793 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
@@ -1942,41 +1943,51 @@ public class FetcherTest {
return 1;
}
- private void testGetOffsetsForTimesWithError(Errors errorForTp0,
- Errors errorForTp1,
- long offsetForTp0,
- long offsetForTp1,
- Long expectedOffsetForTp0,
- Long expectedOffsetForTp1) {
+ private void testGetOffsetsForTimesWithError(Errors errorForP0,
+ Errors errorForP1,
+ long offsetForP0,
+ long offsetForP1,
+ Long expectedOffsetForP0,
+ Long expectedOffsetForP1) {
client.reset();
- // Ensure metadata has both partition.
- Cluster cluster = TestUtils.clusterWith(2, topicName, 2);
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ String topicName2 = "topic2";
+ TopicPartition t2p0 = new TopicPartition(topicName2, 0);
+ // Expect a metadata refresh.
+ metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"))),
+ Collections.<String>emptySet(),
+ time.milliseconds());
+
+ Map<String, Integer> partitionNumByTopic = new HashMap<>();
+ partitionNumByTopic.put(topicName, 2);
+ partitionNumByTopic.put(topicName2, 1);
+ cluster = TestUtils.clusterWith(2, partitionNumByTopic);
+ // The metadata refresh should contain all the topics.
+ client.prepareMetadataUpdate(cluster, Collections.<String>emptySet(), true);
// First try should fail due to metadata error.
- client.prepareResponseFrom(listOffsetResponse(tp0, errorForTp0, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));
- client.prepareResponseFrom(listOffsetResponse(tp1, errorForTp1, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1));
+ client.prepareResponseFrom(listOffsetResponse(t2p0, errorForP0, offsetForP0, offsetForP0), cluster.leaderFor(t2p0));
+ client.prepareResponseFrom(listOffsetResponse(tp1, errorForP1, offsetForP1, offsetForP1), cluster.leaderFor(tp1));
// Second try should succeed.
- client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));
- client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1));
+ client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, offsetForP0, offsetForP0), cluster.leaderFor(t2p0));
+ client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, offsetForP1, offsetForP1), cluster.leaderFor(tp1));
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
- timestampToSearch.put(tp0, 0L);
+ timestampToSearch.put(t2p0, 0L);
timestampToSearch.put(tp1, 0L);
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = fetcher.getOffsetsByTimes(timestampToSearch, Long.MAX_VALUE);
- if (expectedOffsetForTp0 == null)
- assertNull(offsetAndTimestampMap.get(tp0));
+ if (expectedOffsetForP0 == null)
+ assertNull(offsetAndTimestampMap.get(t2p0));
else {
- assertEquals(expectedOffsetForTp0.longValue(), offsetAndTimestampMap.get(tp0).timestamp());
- assertEquals(expectedOffsetForTp0.longValue(), offsetAndTimestampMap.get(tp0).offset());
+ assertEquals(expectedOffsetForP0.longValue(), offsetAndTimestampMap.get(t2p0).timestamp());
+ assertEquals(expectedOffsetForP0.longValue(), offsetAndTimestampMap.get(t2p0).offset());
}
- if (expectedOffsetForTp1 == null)
+ if (expectedOffsetForP1 == null)
assertNull(offsetAndTimestampMap.get(tp1));
else {
- assertEquals(expectedOffsetForTp1.longValue(), offsetAndTimestampMap.get(tp1).timestamp());
- assertEquals(expectedOffsetForTp1.longValue(), offsetAndTimestampMap.get(tp1).offset());
+ assertEquals(expectedOffsetForP1.longValue(), offsetAndTimestampMap.get(tp1).timestamp());
+ assertEquals(expectedOffsetForP1.longValue(), offsetAndTimestampMap.get(tp1).offset());
}
}
--
To stop receiving notification emails like this one, please contact
jqin@apache.org.