You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/01/10 01:04:43 UTC
[kafka] 01/04: Use PartitionMetadata and delete
PartitionInfoAndEpoch
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch KAFKA-9261
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit f79a1db9b80c6f5d61618baec934aee54f669dd4
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Dec 2 19:28:12 2019 -0800
Use PartitionMetadata and delete PartitionInfoAndEpoch
---
.../java/org/apache/kafka/clients/Metadata.java | 36 +++++-----
.../org/apache/kafka/clients/MetadataCache.java | 77 ++++++++--------------
.../kafka/clients/consumer/internals/Fetcher.java | 38 ++++++-----
.../kafka/common/requests/MetadataResponse.java | 39 +++++++----
.../org/apache/kafka/clients/MetadataTest.java | 32 ++++-----
.../kafka/clients/admin/KafkaAdminClientTest.java | 20 +++---
.../internals/ConsumerCoordinatorTest.java | 4 +-
.../consumer/internals/ConsumerMetadataTest.java | 2 +-
.../clients/consumer/internals/FetcherTest.java | 2 +-
.../kafka/common/requests/RequestResponseTest.java | 10 +--
.../test/java/org/apache/kafka/test/TestUtils.java | 4 +-
11 files changed, 130 insertions(+), 134 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 82c1b07..9589739 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -19,14 +19,12 @@ package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.LogContext;
@@ -199,11 +197,11 @@ public class Metadata implements Closeable {
/**
* Return the cached partition info if it exists and a newer leader epoch isn't known about.
*/
- public synchronized Optional<MetadataCache.PartitionInfoAndEpoch> partitionInfoIfCurrent(TopicPartition topicPartition) {
+ public synchronized Optional<MetadataResponse.PartitionMetadata> partitionMetadataIfCurrent(TopicPartition topicPartition) {
Integer epoch = lastSeenLeaderEpochs.get(topicPartition);
if (epoch == null) {
// old cluster format (no epochs)
- return cache.getPartitionInfo(topicPartition);
+ return cache.partitionMetadata(topicPartition);
} else {
return cache.getPartitionInfoHavingEpoch(topicPartition, epoch);
}
@@ -289,7 +287,7 @@ public class Metadata implements Closeable {
private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse,
Predicate<MetadataResponse.TopicMetadata> topicsToRetain) {
Set<String> internalTopics = new HashSet<>();
- List<MetadataCache.PartitionInfoAndEpoch> partitions = new ArrayList<>();
+ List<MetadataResponse.PartitionMetadata> partitions = new ArrayList<>();
for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
if (!topicsToRetain.test(metadata))
continue;
@@ -305,7 +303,7 @@ public class Metadata implements Closeable {
if (partitionMetadata.error().exception() instanceof InvalidMetadataException) {
log.debug("Requesting metadata update for partition {} due to error {}",
- new TopicPartition(metadata.topic(), partitionMetadata.partition()), partitionMetadata.error());
+ partitionMetadata.topicPartition, partitionMetadata.error());
requestUpdate();
}
}
@@ -315,10 +313,13 @@ public class Metadata implements Closeable {
}
}
- return new MetadataCache(metadataResponse.clusterId(), new ArrayList<>(metadataResponse.brokers()), partitions,
+ return new MetadataCache(metadataResponse.clusterId(),
+ new ArrayList<>(metadataResponse.brokers()),
+ partitions,
metadataResponse.topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
metadataResponse.topicsByError(Errors.INVALID_TOPIC_EXCEPTION),
- internalTopics, metadataResponse.controller());
+ internalTopics,
+ metadataResponse.controller());
}
/**
@@ -327,25 +328,22 @@ public class Metadata implements Closeable {
private void updatePartitionInfo(String topic,
MetadataResponse.PartitionMetadata partitionMetadata,
boolean hasReliableLeaderEpoch,
- Consumer<MetadataCache.PartitionInfoAndEpoch> partitionInfoConsumer) {
+ Consumer<MetadataResponse.PartitionMetadata> partitionInfoConsumer) {
TopicPartition tp = new TopicPartition(topic, partitionMetadata.partition());
if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch().isPresent()) {
int newEpoch = partitionMetadata.leaderEpoch().get();
// If the received leader epoch is at least the same as the previous one, update the metadata
if (updateLastSeenEpoch(tp, newEpoch, oldEpoch -> newEpoch >= oldEpoch, false)) {
- PartitionInfo info = MetadataResponse.partitionMetaToInfo(topic, partitionMetadata);
- partitionInfoConsumer.accept(new MetadataCache.PartitionInfoAndEpoch(info, newEpoch));
+ partitionInfoConsumer.accept(partitionMetadata);
} else {
// Otherwise ignore the new metadata and use the previously cached info
- cache.getPartitionInfo(tp).ifPresent(partitionInfoConsumer);
+ cache.partitionMetadata(partitionMetadata.topicPartition).ifPresent(partitionInfoConsumer);
}
} else {
// Handle old cluster formats as well as error responses where leader and epoch are missing
lastSeenLeaderEpochs.remove(tp);
- PartitionInfo info = MetadataResponse.partitionMetaToInfo(topic, partitionMetadata);
- partitionInfoConsumer.accept(new MetadataCache.PartitionInfoAndEpoch(info,
- RecordBatch.NO_PARTITION_LEADER_EPOCH));
+ partitionInfoConsumer.accept(partitionMetadata.withoutLeaderEpoch());
}
}
@@ -492,10 +490,10 @@ public class Metadata implements Closeable {
}
public synchronized LeaderAndEpoch leaderAndEpoch(TopicPartition tp) {
- return partitionInfoIfCurrent(tp)
- .map(infoAndEpoch -> {
- Node leader = infoAndEpoch.partitionInfo().leader();
- return new LeaderAndEpoch(leader == null ? Node.noNode() : leader, Optional.of(infoAndEpoch.epoch()));
+ return partitionMetadataIfCurrent(tp)
+ .map(partitionMetadata -> {
+ Node leader = partitionMetadata.leader();
+ return new LeaderAndEpoch(leader == null ? Node.noNode() : leader, partitionMetadata.leaderEpoch());
})
.orElse(new LeaderAndEpoch(Node.noNode(), lastSeenLeaderEpoch(tp)));
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
index e58da12..af595fe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.MetadataResponse;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -28,11 +29,12 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
+
/**
* An internal mutable cache of nodes, topics, and partitions in the Kafka cluster. This keeps an up-to-date Cluster
* instance which is optimized for read access.
@@ -44,13 +46,13 @@ public class MetadataCache {
private final Set<String> invalidTopics;
private final Set<String> internalTopics;
private final Node controller;
- private final Map<TopicPartition, PartitionInfoAndEpoch> metadataByPartition;
+ private final Map<TopicPartition, MetadataResponse.PartitionMetadata> metadataByPartition;
private Cluster clusterInstance;
MetadataCache(String clusterId,
List<Node> nodes,
- Collection<PartitionInfoAndEpoch> partitions,
+ Collection<MetadataResponse.PartitionMetadata> partitions,
Set<String> unauthorizedTopics,
Set<String> invalidTopics,
Set<String> internalTopics,
@@ -60,7 +62,7 @@ public class MetadataCache {
MetadataCache(String clusterId,
List<Node> nodes,
- Collection<PartitionInfoAndEpoch> partitions,
+ Collection<MetadataResponse.PartitionMetadata> partitions,
Set<String> unauthorizedTopics,
Set<String> invalidTopics,
Set<String> internalTopics,
@@ -74,8 +76,8 @@ public class MetadataCache {
this.controller = controller;
this.metadataByPartition = new HashMap<>(partitions.size());
- for (PartitionInfoAndEpoch p : partitions) {
- this.metadataByPartition.put(new TopicPartition(p.partitionInfo().topic(), p.partitionInfo().partition()), p);
+ for (MetadataResponse.PartitionMetadata p : partitions) {
+ this.metadataByPartition.put(p.topicPartition, p);
}
if (clusterInstance == null) {
@@ -88,12 +90,13 @@ public class MetadataCache {
/**
* Return the cached PartitionInfo iff it was for the given epoch
*/
- Optional<PartitionInfoAndEpoch> getPartitionInfoHavingEpoch(TopicPartition topicPartition, int epoch) {
- PartitionInfoAndEpoch infoAndEpoch = metadataByPartition.get(topicPartition);
- return Optional.ofNullable(infoAndEpoch).filter(infoEpoch -> infoEpoch.epoch() == epoch);
+ Optional<MetadataResponse.PartitionMetadata> getPartitionInfoHavingEpoch(TopicPartition topicPartition, int epoch) {
+ MetadataResponse.PartitionMetadata infoAndEpoch = metadataByPartition.get(topicPartition);
+ return Optional.ofNullable(infoAndEpoch)
+ .filter(infoEpoch -> infoEpoch.leaderEpoch().orElse(NO_PARTITION_LEADER_EPOCH) == epoch);
}
- Optional<PartitionInfoAndEpoch> getPartitionInfo(TopicPartition topicPartition) {
+ Optional<MetadataResponse.PartitionMetadata> partitionMetadata(TopicPartition topicPartition) {
return Optional.ofNullable(metadataByPartition.get(topicPartition));
}
@@ -108,9 +111,20 @@ public class MetadataCache {
private void computeClusterView() {
List<PartitionInfo> partitionInfos = metadataByPartition.values()
.stream()
- .map(PartitionInfoAndEpoch::partitionInfo)
+ .map(MetadataCache::buildPartitionInfo)
.collect(Collectors.toList());
- this.clusterInstance = new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, invalidTopics, internalTopics, controller);
+ this.clusterInstance = new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics,
+ invalidTopics, internalTopics, controller);
+ }
+
+ static PartitionInfo buildPartitionInfo(MetadataResponse.PartitionMetadata metadata) {
+ return new PartitionInfo(
+ metadata.topic(),
+ metadata.partition(),
+ metadata.leader(),
+ metadata.replicas().toArray(new Node[0]),
+ metadata.isr().toArray(new Node[0]),
+ metadata.offlineReplicas().toArray(new Node[0]));
}
static MetadataCache bootstrap(List<InetSocketAddress> addresses) {
@@ -137,43 +151,4 @@ public class MetadataCache {
'}';
}
- public static class PartitionInfoAndEpoch {
- private final PartitionInfo partitionInfo;
- private final int epoch;
-
- PartitionInfoAndEpoch(PartitionInfo partitionInfo, int epoch) {
- this.partitionInfo = partitionInfo;
- this.epoch = epoch;
- }
-
- public PartitionInfo partitionInfo() {
- return partitionInfo;
- }
-
- public int epoch() {
- return epoch;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- PartitionInfoAndEpoch that = (PartitionInfoAndEpoch) o;
- return epoch == that.epoch &&
- Objects.equals(partitionInfo, that.partitionInfo);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(partitionInfo, epoch);
- }
-
- @Override
- public String toString() {
- return "PartitionInfoAndEpoch{" +
- "partitionInfo=" + partitionInfo +
- ", epoch=" + epoch +
- '}';
- }
- }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index a162ddb..ff9d738 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;
-import org.apache.kafka.clients.MetadataCache;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.ApiVersion;
import org.apache.kafka.clients.StaleMetadataException;
@@ -902,27 +901,32 @@ public class Fetcher<K, V> implements Closeable {
for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
TopicPartition tp = entry.getKey();
Long offset = entry.getValue();
- Optional<MetadataCache.PartitionInfoAndEpoch> currentInfo = metadata.partitionInfoIfCurrent(tp);
+ Optional<MetadataResponse.PartitionMetadata> currentInfo = metadata.partitionMetadataIfCurrent(tp);
if (!currentInfo.isPresent()) {
log.debug("Leader for partition {} is unknown for fetching offset {}", tp, offset);
metadata.requestUpdate();
partitionsToRetry.add(tp);
- } else if (currentInfo.get().partitionInfo().leader() == null) {
- log.debug("Leader for partition {} is unavailable for fetching offset {}", tp, offset);
- metadata.requestUpdate();
- partitionsToRetry.add(tp);
- } else if (client.isUnavailable(currentInfo.get().partitionInfo().leader())) {
- client.maybeThrowAuthFailure(currentInfo.get().partitionInfo().leader());
-
- // The connection has failed and we need to await the blackout period before we can
- // try again. No need to request a metadata update since the disconnect will have
- // done so already.
- log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires",
- currentInfo.get().partitionInfo().leader(), tp);
- partitionsToRetry.add(tp);
} else {
- partitionDataMap.put(tp,
- new ListOffsetRequest.PartitionData(offset, Optional.of(currentInfo.get().epoch())));
+ MetadataResponse.PartitionMetadata partitionMetadata = currentInfo.get();
+ Node leader = partitionMetadata.leader();
+
+ if (leader == null) {
+ log.debug("Leader for partition {} is unavailable for fetching offset {}", tp, offset);
+ metadata.requestUpdate();
+ partitionsToRetry.add(tp);
+ } else if (client.isUnavailable(leader)) {
+ client.maybeThrowAuthFailure(leader);
+
+ // The connection has failed and we need to await the blackout period before we can
+ // try again. No need to request a metadata update since the disconnect will have
+ // done so already.
+ log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires",
+ leader, tp);
+ partitionsToRetry.add(tp);
+ } else {
+ partitionDataMap.put(tp,
+ new ListOffsetRequest.PartitionData(offset, partitionMetadata.leaderEpoch()));
+ }
}
}
return regroupPartitionMapByNode(partitionDataMap);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index e4e09a5..16ec87d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -19,10 +19,11 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.MetadataResponseData;
-import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
-import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
@@ -314,8 +315,9 @@ public class MetadataResponse extends AbstractResponse {
// This is used to describe per-partition state in the MetadataResponse
public static class PartitionMetadata {
+ public final TopicPartition topicPartition;
+
private final Errors error;
- private final int partition;
private final Node leader;
private final Optional<Integer> leaderEpoch;
private final List<Node> replicas;
@@ -323,14 +325,14 @@ public class MetadataResponse extends AbstractResponse {
private final List<Node> offlineReplicas;
public PartitionMetadata(Errors error,
- int partition,
+ TopicPartition topicPartition,
Node leader,
Optional<Integer> leaderEpoch,
List<Node> replicas,
List<Node> isr,
List<Node> offlineReplicas) {
this.error = error;
- this.partition = partition;
+ this.topicPartition = topicPartition;
this.leader = leader;
this.leaderEpoch = leaderEpoch;
this.replicas = replicas;
@@ -338,12 +340,26 @@ public class MetadataResponse extends AbstractResponse {
this.offlineReplicas = offlineReplicas;
}
+ public PartitionMetadata withoutLeaderEpoch() {
+ return new PartitionMetadata(error,
+ topicPartition,
+ leader,
+ Optional.empty(),
+ replicas,
+ isr,
+ offlineReplicas);
+ }
+
public Errors error() {
return error;
}
public int partition() {
- return partition;
+ return topicPartition.partition();
+ }
+
+ public String topic() {
+ return topicPartition.topic();
}
public int leaderId() {
@@ -372,9 +388,9 @@ public class MetadataResponse extends AbstractResponse {
@Override
public String toString() {
- return "(type=PartitionMetadata" +
+ return "PartitionMetadata(" +
", error=" + error +
- ", partition=" + partition +
+ ", partition=" + topicPartition +
", leader=" + leader +
", leaderEpoch=" + leaderEpoch +
", replicas=" + Utils.join(replicas, ",") +
@@ -417,8 +433,9 @@ public class MetadataResponse extends AbstractResponse {
List<Node> replicaNodes = convertToNodes(brokerMap, partitionMetadata.replicaNodes());
List<Node> isrNodes = convertToNodes(brokerMap, partitionMetadata.isrNodes());
List<Node> offlineNodes = convertToNodes(brokerMap, partitionMetadata.offlineReplicas());
- partitionMetadataList.add(new PartitionMetadata(partitionError, partitionIndex, leaderNode, leaderEpoch,
- replicaNodes, isrNodes, offlineNodes));
+ TopicPartition topicPartition = new TopicPartition(topic, partitionIndex);
+ partitionMetadataList.add(new PartitionMetadata(partitionError, topicPartition, leaderNode,
+ leaderEpoch, replicaNodes, isrNodes, offlineNodes));
}
topicMetadataList.add(new TopicMetadata(topicError, topic, isInternal, partitionMetadataList,
@@ -469,7 +486,7 @@ public class MetadataResponse extends AbstractResponse {
for (PartitionMetadata partitionMetadata : topicMetadata.partitionMetadata) {
metadataResponseTopic.partitions().add(new MetadataResponsePartition()
.setErrorCode(partitionMetadata.error.code())
- .setPartitionIndex(partitionMetadata.partition)
+ .setPartitionIndex(partitionMetadata.partition())
.setLeaderId(partitionMetadata.leader == null ? -1 : partitionMetadata.leader.id())
.setLeaderEpoch(partitionMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
.setReplicaNodes(partitionMetadata.replicas.stream().map(Node::id).collect(Collectors.toList()))
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 7067e88..440ee55 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -196,9 +196,9 @@ public class MetadataTest {
MetadataResponse response = new MetadataResponse(struct, version);
assertFalse(response.hasReliableLeaderEpochs());
metadata.update(response, 100);
- assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
- MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get();
- assertEquals(-1, info.epoch());
+ assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+ MetadataResponse.PartitionMetadata metadata = this.metadata.partitionMetadataIfCurrent(tp).get();
+ assertEquals(Optional.empty(), metadata.leaderEpoch());
}
for (short version = 9; version <= ApiKeys.METADATA.latestVersion(); version++) {
@@ -206,9 +206,9 @@ public class MetadataTest {
MetadataResponse response = new MetadataResponse(struct, version);
assertTrue(response.hasReliableLeaderEpochs());
metadata.update(response, 100);
- assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
- MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get();
- assertEquals(10, info.epoch());
+ assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+ MetadataResponse.PartitionMetadata info = metadata.partitionMetadataIfCurrent(tp).get();
+ assertEquals(Optional.of(10), info.leaderEpoch());
}
}
@@ -255,13 +255,13 @@ public class MetadataTest {
metadata.update(new MetadataResponse(data), 101);
assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
- assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
- MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get();
+ assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+ MetadataResponse.PartitionMetadata metadata = this.metadata.partitionMetadataIfCurrent(tp).get();
- List<Integer> cachedIsr = Arrays.stream(info.partitionInfo().inSyncReplicas())
+ List<Integer> cachedIsr = metadata.isr().stream()
.map(Node::id).collect(Collectors.toList());
assertEquals(Arrays.asList(1, 2, 3), cachedIsr);
- assertEquals(10, info.epoch());
+ assertEquals(Optional.of(10), metadata.leaderEpoch());
}
@Test
@@ -419,14 +419,14 @@ public class MetadataTest {
// Cache of partition stays, but current partition info is not available since it's stale
assertNotNull(metadata.fetch().partition(tp));
assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
- assertFalse(metadata.partitionInfoIfCurrent(tp).isPresent());
+ assertFalse(metadata.partitionMetadataIfCurrent(tp).isPresent());
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
// Metadata with older epoch is rejected, metadata state is unchanged
metadata.update(metadataResponse, 20L);
assertNotNull(metadata.fetch().partition(tp));
assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
- assertFalse(metadata.partitionInfoIfCurrent(tp).isPresent());
+ assertFalse(metadata.partitionMetadataIfCurrent(tp).isPresent());
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
// Metadata with equal or newer epoch is accepted
@@ -434,7 +434,7 @@ public class MetadataTest {
metadata.update(metadataResponse, 30L);
assertNotNull(metadata.fetch().partition(tp));
assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
- assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
+ assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
}
@@ -450,9 +450,9 @@ public class MetadataTest {
assertFalse(metadata.lastSeenLeaderEpoch(tp).isPresent());
// still works
- assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
- assertEquals(metadata.partitionInfoIfCurrent(tp).get().partitionInfo().partition(), 0);
- assertEquals(metadata.partitionInfoIfCurrent(tp).get().partitionInfo().leader().id(), 0);
+ assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+ assertEquals(metadata.partitionMetadataIfCurrent(tp).get().partition(), 0);
+ assertEquals(metadata.partitionMetadataIfCurrent(tp).get().leader().id(), 0);
}
@Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 8aa222e..913166b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -339,7 +339,7 @@ public class KafkaAdminClientTest {
List<PartitionMetadata> pms = new ArrayList<>();
for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) {
PartitionMetadata pm = new PartitionMetadata(error,
- pInfo.partition(),
+ new TopicPartition(topic, pInfo.partition()),
pInfo.leader(),
Optional.of(234),
Arrays.asList(pInfo.replicas()),
@@ -606,8 +606,8 @@ public class KafkaAdminClientTest {
// Then we respond to the DescribeTopic request
Node leader = initializedCluster.nodes().get(0);
MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(
- Errors.NONE, 0, leader, Optional.of(10), singletonList(leader),
- singletonList(leader), singletonList(leader));
+ Errors.NONE, new TopicPartition(topic, 0), leader, Optional.of(10),
+ singletonList(leader), singletonList(leader), singletonList(leader));
env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(initializedCluster.nodes(),
initializedCluster.clusterResource().clusterId(), 1,
singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, topic, false,
@@ -982,10 +982,10 @@ public class KafkaAdminClientTest {
List<Node> nodes = env.cluster().nodes();
List<MetadataResponse.PartitionMetadata> partitionMetadata = new ArrayList<>();
- partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp0.partition(), nodes.get(0),
+ partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp0, nodes.get(0),
Optional.of(5), singletonList(nodes.get(0)), singletonList(nodes.get(0)),
Collections.emptyList()));
- partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp1.partition(), nodes.get(1),
+ partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp1, nodes.get(1),
Optional.of(5), singletonList(nodes.get(1)), singletonList(nodes.get(1)), Collections.emptyList()));
List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
@@ -1046,16 +1046,16 @@ public class KafkaAdminClientTest {
List<MetadataResponse.TopicMetadata> t = new ArrayList<>();
List<MetadataResponse.PartitionMetadata> p = new ArrayList<>();
- p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, nodes.get(0), Optional.of(5),
+ p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition0, nodes.get(0), Optional.of(5),
singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.emptyList()));
- p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, nodes.get(0), Optional.of(5),
+ p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition1, nodes.get(0), Optional.of(5),
singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.emptyList()));
- p.add(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 2, null,
+ p.add(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, myTopicPartition2, null,
Optional.empty(), singletonList(nodes.get(0)), singletonList(nodes.get(0)),
Collections.emptyList()));
- p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 3, nodes.get(0), Optional.of(5),
+ p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition3, nodes.get(0), Optional.of(5),
singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.emptyList()));
- p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 4, nodes.get(0), Optional.of(5),
+ p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition4, nodes.get(0), Optional.of(5),
singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.emptyList()));
t.add(new MetadataResponse.TopicMetadata(Errors.NONE, "my_topic", false, p));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 7cde5b4..faec501 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1352,8 +1352,8 @@ public class ConsumerCoordinatorTest {
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
Node node = new Node(0, "localhost", 9999);
MetadataResponse.PartitionMetadata partitionMetadata =
- new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node, Optional.empty(),
- singletonList(node), singletonList(node), singletonList(node));
+ new MetadataResponse.PartitionMetadata(Errors.NONE, new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0),
+ node, Optional.empty(), singletonList(node), singletonList(node), singletonList(node));
MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE,
Topic.GROUP_METADATA_TOPIC_NAME, true, singletonList(partitionMetadata));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index 33d102d..ce90089 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -157,7 +157,7 @@ public class ConsumerMetadataTest {
private MetadataResponse.TopicMetadata topicMetadata(String topic, boolean isInternal) {
MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE,
- 0, node, Optional.of(5), singletonList(node), singletonList(node), singletonList(node));
+ new TopicPartition(topic, 0), node, Optional.of(5), singletonList(node), singletonList(node), singletonList(node));
return new MetadataResponse.TopicMetadata(Errors.NONE, topic, isInternal, singletonList(partitionMetadata));
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 6440c42..ebad2e0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1947,7 +1947,7 @@ public class FetcherTest {
for (MetadataResponse.PartitionMetadata p : partitions) {
altPartitions.add(new MetadataResponse.PartitionMetadata(
p.error(),
- p.partition(),
+ new TopicPartition(item.topic(), p.partition()),
null, //no leader
Optional.empty(),
p.replicas(),
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 1e88fc7..e5bd1df 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1169,13 +1169,15 @@ public class RequestResponseTest {
List<MetadataResponse.TopicMetadata> allTopicMetadata = new ArrayList<>();
allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "__consumer_offsets", true,
- asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node,
- Optional.of(5), replicas, isr, offlineReplicas))));
+ asList(new MetadataResponse.PartitionMetadata(Errors.NONE,
+ new TopicPartition("__consumer_offsets", 1), node, Optional.of(5),
+ replicas, isr, offlineReplicas))));
allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false,
Collections.emptyList()));
allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "topic3", false,
- asList(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 0, null,
- Optional.empty(), replicas, isr, offlineReplicas))));
+ asList(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE,
+ new TopicPartition("topic3", 0), null,
+ Optional.empty(), replicas, isr, offlineReplicas))));
return MetadataResponse.prepareResponse(asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata);
}
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index ece5af3..916feea 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -160,7 +160,7 @@ public class TestUtils {
Node leader = nodes.get(i % nodes.size());
List<Node> replicas = Collections.singletonList(leader);
partitionMetadata.add(partitionSupplier.supply(
- Errors.NONE, i, leader, Optional.ofNullable(epochSupplier.apply(tp)), replicas, replicas, replicas));
+ Errors.NONE, tp, leader, Optional.ofNullable(epochSupplier.apply(tp)), replicas, replicas, replicas));
}
topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, topic,
@@ -179,7 +179,7 @@ public class TestUtils {
@FunctionalInterface
public interface PartitionMetadataSupplier {
MetadataResponse.PartitionMetadata supply(Errors error,
- int partition,
+ TopicPartition partition,
Node leader,
Optional<Integer> leaderEpoch,
List<Node> replicas,