You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/01/10 01:04:43 UTC

[kafka] 01/04: Use PartitionMetadata and delete PartitionInfoAndEpoch

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch KAFKA-9261
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit f79a1db9b80c6f5d61618baec934aee54f669dd4
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Dec 2 19:28:12 2019 -0800

    Use PartitionMetadata and delete PartitionInfoAndEpoch
---
 .../java/org/apache/kafka/clients/Metadata.java    | 36 +++++-----
 .../org/apache/kafka/clients/MetadataCache.java    | 77 ++++++++--------------
 .../kafka/clients/consumer/internals/Fetcher.java  | 38 ++++++-----
 .../kafka/common/requests/MetadataResponse.java    | 39 +++++++----
 .../org/apache/kafka/clients/MetadataTest.java     | 32 ++++-----
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 20 +++---
 .../internals/ConsumerCoordinatorTest.java         |  4 +-
 .../consumer/internals/ConsumerMetadataTest.java   |  2 +-
 .../clients/consumer/internals/FetcherTest.java    |  2 +-
 .../kafka/common/requests/RequestResponseTest.java | 10 +--
 .../test/java/org/apache/kafka/test/TestUtils.java |  4 +-
 11 files changed, 130 insertions(+), 134 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 82c1b07..9589739 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -19,14 +19,12 @@ package org.apache.kafka.clients;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.LogContext;
@@ -199,11 +197,11 @@ public class Metadata implements Closeable {
     /**
      * Return the cached partition info if it exists and a newer leader epoch isn't known about.
      */
-    public synchronized Optional<MetadataCache.PartitionInfoAndEpoch> partitionInfoIfCurrent(TopicPartition topicPartition) {
+    public synchronized Optional<MetadataResponse.PartitionMetadata> partitionMetadataIfCurrent(TopicPartition topicPartition) {
         Integer epoch = lastSeenLeaderEpochs.get(topicPartition);
         if (epoch == null) {
             // old cluster format (no epochs)
-            return cache.getPartitionInfo(topicPartition);
+            return cache.partitionMetadata(topicPartition);
         } else {
             return cache.getPartitionInfoHavingEpoch(topicPartition, epoch);
         }
@@ -289,7 +287,7 @@ public class Metadata implements Closeable {
     private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse,
                                                  Predicate<MetadataResponse.TopicMetadata> topicsToRetain) {
         Set<String> internalTopics = new HashSet<>();
-        List<MetadataCache.PartitionInfoAndEpoch> partitions = new ArrayList<>();
+        List<MetadataResponse.PartitionMetadata> partitions = new ArrayList<>();
         for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
             if (!topicsToRetain.test(metadata))
                 continue;
@@ -305,7 +303,7 @@ public class Metadata implements Closeable {
 
                     if (partitionMetadata.error().exception() instanceof InvalidMetadataException) {
                         log.debug("Requesting metadata update for partition {} due to error {}",
-                                new TopicPartition(metadata.topic(), partitionMetadata.partition()), partitionMetadata.error());
+                                partitionMetadata.topicPartition, partitionMetadata.error());
                         requestUpdate();
                     }
                 }
@@ -315,10 +313,13 @@ public class Metadata implements Closeable {
             }
         }
 
-        return new MetadataCache(metadataResponse.clusterId(), new ArrayList<>(metadataResponse.brokers()), partitions,
+        return new MetadataCache(metadataResponse.clusterId(),
+                new ArrayList<>(metadataResponse.brokers()),
+                partitions,
                 metadataResponse.topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
                 metadataResponse.topicsByError(Errors.INVALID_TOPIC_EXCEPTION),
-                internalTopics, metadataResponse.controller());
+                internalTopics,
+                metadataResponse.controller());
     }
 
     /**
@@ -327,25 +328,22 @@ public class Metadata implements Closeable {
     private void updatePartitionInfo(String topic,
                                      MetadataResponse.PartitionMetadata partitionMetadata,
                                      boolean hasReliableLeaderEpoch,
-                                     Consumer<MetadataCache.PartitionInfoAndEpoch> partitionInfoConsumer) {
+                                     Consumer<MetadataResponse.PartitionMetadata> partitionInfoConsumer) {
         TopicPartition tp = new TopicPartition(topic, partitionMetadata.partition());
 
         if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch().isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch().get();
             // If the received leader epoch is at least the same as the previous one, update the metadata
             if (updateLastSeenEpoch(tp, newEpoch, oldEpoch -> newEpoch >= oldEpoch, false)) {
-                PartitionInfo info = MetadataResponse.partitionMetaToInfo(topic, partitionMetadata);
-                partitionInfoConsumer.accept(new MetadataCache.PartitionInfoAndEpoch(info, newEpoch));
+                partitionInfoConsumer.accept(partitionMetadata);
             } else {
                 // Otherwise ignore the new metadata and use the previously cached info
-                cache.getPartitionInfo(tp).ifPresent(partitionInfoConsumer);
+                cache.partitionMetadata(partitionMetadata.topicPartition).ifPresent(partitionInfoConsumer);
             }
         } else {
             // Handle old cluster formats as well as error responses where leader and epoch are missing
             lastSeenLeaderEpochs.remove(tp);
-            PartitionInfo info = MetadataResponse.partitionMetaToInfo(topic, partitionMetadata);
-            partitionInfoConsumer.accept(new MetadataCache.PartitionInfoAndEpoch(info,
-                    RecordBatch.NO_PARTITION_LEADER_EPOCH));
+            partitionInfoConsumer.accept(partitionMetadata.withoutLeaderEpoch());
         }
     }
 
@@ -492,10 +490,10 @@ public class Metadata implements Closeable {
     }
 
     public synchronized LeaderAndEpoch leaderAndEpoch(TopicPartition tp) {
-        return partitionInfoIfCurrent(tp)
-                .map(infoAndEpoch -> {
-                    Node leader = infoAndEpoch.partitionInfo().leader();
-                    return new LeaderAndEpoch(leader == null ? Node.noNode() : leader, Optional.of(infoAndEpoch.epoch()));
+        return partitionMetadataIfCurrent(tp)
+                .map(partitionMetadata -> {
+                    Node leader = partitionMetadata.leader();
+                    return new LeaderAndEpoch(leader == null ? Node.noNode() : leader, partitionMetadata.leaderEpoch());
                 })
                 .orElse(new LeaderAndEpoch(Node.noNode(), lastSeenLeaderEpoch(tp)));
     }
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
index e58da12..af595fe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.MetadataResponse;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -28,11 +29,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
+
 /**
  * An internal mutable cache of nodes, topics, and partitions in the Kafka cluster. This keeps an up-to-date Cluster
  * instance which is optimized for read access.
@@ -44,13 +46,13 @@ public class MetadataCache {
     private final Set<String> invalidTopics;
     private final Set<String> internalTopics;
     private final Node controller;
-    private final Map<TopicPartition, PartitionInfoAndEpoch> metadataByPartition;
+    private final Map<TopicPartition, MetadataResponse.PartitionMetadata> metadataByPartition;
 
     private Cluster clusterInstance;
 
     MetadataCache(String clusterId,
                   List<Node> nodes,
-                  Collection<PartitionInfoAndEpoch> partitions,
+                  Collection<MetadataResponse.PartitionMetadata> partitions,
                   Set<String> unauthorizedTopics,
                   Set<String> invalidTopics,
                   Set<String> internalTopics,
@@ -60,7 +62,7 @@ public class MetadataCache {
 
     MetadataCache(String clusterId,
                   List<Node> nodes,
-                  Collection<PartitionInfoAndEpoch> partitions,
+                  Collection<MetadataResponse.PartitionMetadata> partitions,
                   Set<String> unauthorizedTopics,
                   Set<String> invalidTopics,
                   Set<String> internalTopics,
@@ -74,8 +76,8 @@ public class MetadataCache {
         this.controller = controller;
 
         this.metadataByPartition = new HashMap<>(partitions.size());
-        for (PartitionInfoAndEpoch p : partitions) {
-            this.metadataByPartition.put(new TopicPartition(p.partitionInfo().topic(), p.partitionInfo().partition()), p);
+        for (MetadataResponse.PartitionMetadata p : partitions) {
+            this.metadataByPartition.put(p.topicPartition, p);
         }
 
         if (clusterInstance == null) {
@@ -88,12 +90,13 @@ public class MetadataCache {
     /**
      * Return the cached PartitionInfo iff it was for the given epoch
      */
-    Optional<PartitionInfoAndEpoch> getPartitionInfoHavingEpoch(TopicPartition topicPartition, int epoch) {
-        PartitionInfoAndEpoch infoAndEpoch = metadataByPartition.get(topicPartition);
-        return Optional.ofNullable(infoAndEpoch).filter(infoEpoch -> infoEpoch.epoch() == epoch);
+    Optional<MetadataResponse.PartitionMetadata> getPartitionInfoHavingEpoch(TopicPartition topicPartition, int epoch) {
+        MetadataResponse.PartitionMetadata infoAndEpoch = metadataByPartition.get(topicPartition);
+        return Optional.ofNullable(infoAndEpoch)
+                .filter(infoEpoch -> infoEpoch.leaderEpoch().orElse(NO_PARTITION_LEADER_EPOCH) == epoch);
     }
 
-    Optional<PartitionInfoAndEpoch> getPartitionInfo(TopicPartition topicPartition) {
+    Optional<MetadataResponse.PartitionMetadata> partitionMetadata(TopicPartition topicPartition) {
         return Optional.ofNullable(metadataByPartition.get(topicPartition));
     }
 
@@ -108,9 +111,20 @@ public class MetadataCache {
     private void computeClusterView() {
         List<PartitionInfo> partitionInfos = metadataByPartition.values()
                 .stream()
-                .map(PartitionInfoAndEpoch::partitionInfo)
+                .map(MetadataCache::buildPartitionInfo)
                 .collect(Collectors.toList());
-        this.clusterInstance = new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, invalidTopics, internalTopics, controller);
+        this.clusterInstance = new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics,
+                invalidTopics, internalTopics, controller);
+    }
+
+    static PartitionInfo buildPartitionInfo(MetadataResponse.PartitionMetadata metadata) {
+        return new PartitionInfo(
+                metadata.topic(),
+                metadata.partition(),
+                metadata.leader(),
+                metadata.replicas().toArray(new Node[0]),
+                metadata.isr().toArray(new Node[0]),
+                metadata.offlineReplicas().toArray(new Node[0]));
     }
 
     static MetadataCache bootstrap(List<InetSocketAddress> addresses) {
@@ -137,43 +151,4 @@ public class MetadataCache {
                 '}';
     }
 
-    public static class PartitionInfoAndEpoch {
-        private final PartitionInfo partitionInfo;
-        private final int epoch;
-
-        PartitionInfoAndEpoch(PartitionInfo partitionInfo, int epoch) {
-            this.partitionInfo = partitionInfo;
-            this.epoch = epoch;
-        }
-
-        public PartitionInfo partitionInfo() {
-            return partitionInfo;
-        }
-
-        public int epoch() {
-            return epoch;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            PartitionInfoAndEpoch that = (PartitionInfoAndEpoch) o;
-            return epoch == that.epoch &&
-                    Objects.equals(partitionInfo, that.partitionInfo);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(partitionInfo, epoch);
-        }
-
-        @Override
-        public String toString() {
-            return "PartitionInfoAndEpoch{" +
-                    "partitionInfo=" + partitionInfo +
-                    ", epoch=" + epoch +
-                    '}';
-        }
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index a162ddb..ff9d738 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.FetchSessionHandler;
-import org.apache.kafka.clients.MetadataCache;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.ApiVersion;
 import org.apache.kafka.clients.StaleMetadataException;
@@ -902,27 +901,32 @@ public class Fetcher<K, V> implements Closeable {
         for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
             TopicPartition tp  = entry.getKey();
             Long offset = entry.getValue();
-            Optional<MetadataCache.PartitionInfoAndEpoch> currentInfo = metadata.partitionInfoIfCurrent(tp);
+            Optional<MetadataResponse.PartitionMetadata> currentInfo = metadata.partitionMetadataIfCurrent(tp);
             if (!currentInfo.isPresent()) {
                 log.debug("Leader for partition {} is unknown for fetching offset {}", tp, offset);
                 metadata.requestUpdate();
                 partitionsToRetry.add(tp);
-            } else if (currentInfo.get().partitionInfo().leader() == null) {
-                log.debug("Leader for partition {} is unavailable for fetching offset {}", tp, offset);
-                metadata.requestUpdate();
-                partitionsToRetry.add(tp);
-            } else if (client.isUnavailable(currentInfo.get().partitionInfo().leader())) {
-                client.maybeThrowAuthFailure(currentInfo.get().partitionInfo().leader());
-
-                // The connection has failed and we need to await the blackout period before we can
-                // try again. No need to request a metadata update since the disconnect will have
-                // done so already.
-                log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires",
-                        currentInfo.get().partitionInfo().leader(), tp);
-                partitionsToRetry.add(tp);
             } else {
-                partitionDataMap.put(tp,
-                        new ListOffsetRequest.PartitionData(offset, Optional.of(currentInfo.get().epoch())));
+                MetadataResponse.PartitionMetadata partitionMetadata = currentInfo.get();
+                Node leader = partitionMetadata.leader();
+
+                if (leader == null) {
+                    log.debug("Leader for partition {} is unavailable for fetching offset {}", tp, offset);
+                    metadata.requestUpdate();
+                    partitionsToRetry.add(tp);
+                } else if (client.isUnavailable(leader)) {
+                    client.maybeThrowAuthFailure(leader);
+
+                    // The connection has failed and we need to await the blackout period before we can
+                    // try again. No need to request a metadata update since the disconnect will have
+                    // done so already.
+                    log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires",
+                            leader, tp);
+                    partitionsToRetry.add(tp);
+                } else {
+                    partitionDataMap.put(tp,
+                            new ListOffsetRequest.PartitionData(offset, partitionMetadata.leaderEpoch()));
+                }
             }
         }
         return regroupPartitionMapByNode(partitionDataMap);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index e4e09a5..16ec87d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -19,10 +19,11 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.MetadataResponseData;
-import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
-import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -314,8 +315,9 @@ public class MetadataResponse extends AbstractResponse {
 
     // This is used to describe per-partition state in the MetadataResponse
     public static class PartitionMetadata {
+        public final TopicPartition topicPartition;
+
         private final Errors error;
-        private final int partition;
         private final Node leader;
         private final Optional<Integer> leaderEpoch;
         private final List<Node> replicas;
@@ -323,14 +325,14 @@ public class MetadataResponse extends AbstractResponse {
         private final List<Node> offlineReplicas;
 
         public PartitionMetadata(Errors error,
-                                 int partition,
+                                 TopicPartition topicPartition,
                                  Node leader,
                                  Optional<Integer> leaderEpoch,
                                  List<Node> replicas,
                                  List<Node> isr,
                                  List<Node> offlineReplicas) {
             this.error = error;
-            this.partition = partition;
+            this.topicPartition = topicPartition;
             this.leader = leader;
             this.leaderEpoch = leaderEpoch;
             this.replicas = replicas;
@@ -338,12 +340,26 @@ public class MetadataResponse extends AbstractResponse {
             this.offlineReplicas = offlineReplicas;
         }
 
+        public PartitionMetadata withoutLeaderEpoch() {
+            return new PartitionMetadata(error,
+                    topicPartition,
+                    leader,
+                    Optional.empty(),
+                    replicas,
+                    isr,
+                    offlineReplicas);
+        }
+
         public Errors error() {
             return error;
         }
 
         public int partition() {
-            return partition;
+            return topicPartition.partition();
+        }
+
+        public String topic() {
+            return topicPartition.topic();
         }
 
         public int leaderId() {
@@ -372,9 +388,9 @@ public class MetadataResponse extends AbstractResponse {
 
         @Override
         public String toString() {
-            return "(type=PartitionMetadata" +
+            return "PartitionMetadata(" +
                     ", error=" + error +
-                    ", partition=" + partition +
+                    ", partition=" + topicPartition +
                     ", leader=" + leader +
                     ", leaderEpoch=" + leaderEpoch +
                     ", replicas=" + Utils.join(replicas, ",") +
@@ -417,8 +433,9 @@ public class MetadataResponse extends AbstractResponse {
                     List<Node> replicaNodes = convertToNodes(brokerMap, partitionMetadata.replicaNodes());
                     List<Node> isrNodes = convertToNodes(brokerMap, partitionMetadata.isrNodes());
                     List<Node> offlineNodes = convertToNodes(brokerMap, partitionMetadata.offlineReplicas());
-                    partitionMetadataList.add(new PartitionMetadata(partitionError, partitionIndex, leaderNode, leaderEpoch,
-                            replicaNodes, isrNodes, offlineNodes));
+                    TopicPartition topicPartition = new TopicPartition(topic, partitionIndex);
+                    partitionMetadataList.add(new PartitionMetadata(partitionError, topicPartition, leaderNode,
+                            leaderEpoch, replicaNodes, isrNodes, offlineNodes));
                 }
 
                 topicMetadataList.add(new TopicMetadata(topicError, topic, isInternal, partitionMetadataList,
@@ -469,7 +486,7 @@ public class MetadataResponse extends AbstractResponse {
             for (PartitionMetadata partitionMetadata : topicMetadata.partitionMetadata) {
                 metadataResponseTopic.partitions().add(new MetadataResponsePartition()
                     .setErrorCode(partitionMetadata.error.code())
-                    .setPartitionIndex(partitionMetadata.partition)
+                    .setPartitionIndex(partitionMetadata.partition())
                     .setLeaderId(partitionMetadata.leader == null ? -1 : partitionMetadata.leader.id())
                     .setLeaderEpoch(partitionMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
                     .setReplicaNodes(partitionMetadata.replicas.stream().map(Node::id).collect(Collectors.toList()))
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 7067e88..440ee55 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -196,9 +196,9 @@ public class MetadataTest {
             MetadataResponse response = new MetadataResponse(struct, version);
             assertFalse(response.hasReliableLeaderEpochs());
             metadata.update(response, 100);
-            assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
-            MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get();
-            assertEquals(-1, info.epoch());
+            assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+            MetadataResponse.PartitionMetadata metadata = this.metadata.partitionMetadataIfCurrent(tp).get();
+            assertEquals(Optional.empty(), metadata.leaderEpoch());
         }
 
         for (short version = 9; version <= ApiKeys.METADATA.latestVersion(); version++) {
@@ -206,9 +206,9 @@ public class MetadataTest {
             MetadataResponse response = new MetadataResponse(struct, version);
             assertTrue(response.hasReliableLeaderEpochs());
             metadata.update(response, 100);
-            assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
-            MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get();
-            assertEquals(10, info.epoch());
+            assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+            MetadataResponse.PartitionMetadata info = metadata.partitionMetadataIfCurrent(tp).get();
+            assertEquals(Optional.of(10), info.leaderEpoch());
         }
     }
 
@@ -255,13 +255,13 @@ public class MetadataTest {
         metadata.update(new MetadataResponse(data), 101);
         assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
 
-        assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
-        MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get();
+        assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+        MetadataResponse.PartitionMetadata metadata = this.metadata.partitionMetadataIfCurrent(tp).get();
 
-        List<Integer> cachedIsr = Arrays.stream(info.partitionInfo().inSyncReplicas())
+        List<Integer> cachedIsr = metadata.isr().stream()
                 .map(Node::id).collect(Collectors.toList());
         assertEquals(Arrays.asList(1, 2, 3), cachedIsr);
-        assertEquals(10, info.epoch());
+        assertEquals(Optional.of(10), metadata.leaderEpoch());
     }
 
     @Test
@@ -419,14 +419,14 @@ public class MetadataTest {
         // Cache of partition stays, but current partition info is not available since it's stale
         assertNotNull(metadata.fetch().partition(tp));
         assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
-        assertFalse(metadata.partitionInfoIfCurrent(tp).isPresent());
+        assertFalse(metadata.partitionMetadataIfCurrent(tp).isPresent());
         assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
 
         // Metadata with older epoch is rejected, metadata state is unchanged
         metadata.update(metadataResponse, 20L);
         assertNotNull(metadata.fetch().partition(tp));
         assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
-        assertFalse(metadata.partitionInfoIfCurrent(tp).isPresent());
+        assertFalse(metadata.partitionMetadataIfCurrent(tp).isPresent());
         assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
 
         // Metadata with equal or newer epoch is accepted
@@ -434,7 +434,7 @@ public class MetadataTest {
         metadata.update(metadataResponse, 30L);
         assertNotNull(metadata.fetch().partition(tp));
         assertEquals(metadata.fetch().partitionCountForTopic("topic-1").longValue(), 5);
-        assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
+        assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
         assertEquals(metadata.lastSeenLeaderEpoch(tp).get().longValue(), 101);
     }
 
@@ -450,9 +450,9 @@ public class MetadataTest {
         assertFalse(metadata.lastSeenLeaderEpoch(tp).isPresent());
 
         // still works
-        assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
-        assertEquals(metadata.partitionInfoIfCurrent(tp).get().partitionInfo().partition(), 0);
-        assertEquals(metadata.partitionInfoIfCurrent(tp).get().partitionInfo().leader().id(), 0);
+        assertTrue(metadata.partitionMetadataIfCurrent(tp).isPresent());
+        assertEquals(metadata.partitionMetadataIfCurrent(tp).get().partition(), 0);
+        assertEquals(metadata.partitionMetadataIfCurrent(tp).get().leader().id(), 0);
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 8aa222e..913166b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -339,7 +339,7 @@ public class KafkaAdminClientTest {
             List<PartitionMetadata> pms = new ArrayList<>();
             for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) {
                 PartitionMetadata pm = new PartitionMetadata(error,
-                        pInfo.partition(),
+                        new TopicPartition(topic, pInfo.partition()),
                         pInfo.leader(),
                         Optional.of(234),
                         Arrays.asList(pInfo.replicas()),
@@ -606,8 +606,8 @@ public class KafkaAdminClientTest {
             // Then we respond to the DescribeTopic request
             Node leader = initializedCluster.nodes().get(0);
             MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(
-                    Errors.NONE, 0, leader, Optional.of(10), singletonList(leader),
-                    singletonList(leader), singletonList(leader));
+                    Errors.NONE, new TopicPartition(topic, 0), leader, Optional.of(10),
+                    singletonList(leader), singletonList(leader), singletonList(leader));
             env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(initializedCluster.nodes(),
                     initializedCluster.clusterResource().clusterId(), 1,
                     singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, topic, false,
@@ -982,10 +982,10 @@ public class KafkaAdminClientTest {
             List<Node> nodes = env.cluster().nodes();
 
             List<MetadataResponse.PartitionMetadata> partitionMetadata = new ArrayList<>();
-            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp0.partition(), nodes.get(0),
+            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp0, nodes.get(0),
                     Optional.of(5), singletonList(nodes.get(0)), singletonList(nodes.get(0)),
                     Collections.emptyList()));
-            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp1.partition(), nodes.get(1),
+            partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, tp1, nodes.get(1),
                     Optional.of(5), singletonList(nodes.get(1)), singletonList(nodes.get(1)), Collections.emptyList()));
 
             List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
@@ -1046,16 +1046,16 @@ public class KafkaAdminClientTest {
 
             List<MetadataResponse.TopicMetadata> t = new ArrayList<>();
             List<MetadataResponse.PartitionMetadata> p = new ArrayList<>();
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, nodes.get(0), Optional.of(5),
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition0, nodes.get(0), Optional.of(5),
                     singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.emptyList()));
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, nodes.get(0), Optional.of(5),
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition1, nodes.get(0), Optional.of(5),
                     singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.emptyList()));
-            p.add(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 2, null,
+            p.add(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, myTopicPartition2, null,
                     Optional.empty(), singletonList(nodes.get(0)), singletonList(nodes.get(0)),
                     Collections.emptyList()));
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 3, nodes.get(0), Optional.of(5),
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition3, nodes.get(0), Optional.of(5),
                     singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.emptyList()));
-            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 4, nodes.get(0), Optional.of(5),
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, myTopicPartition4, nodes.get(0), Optional.of(5),
                     singletonList(nodes.get(0)), singletonList(nodes.get(0)), Collections.emptyList()));
 
             t.add(new MetadataResponse.TopicMetadata(Errors.NONE, "my_topic", false, p));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 7cde5b4..faec501 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1352,8 +1352,8 @@ public class ConsumerCoordinatorTest {
             subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
             Node node = new Node(0, "localhost", 9999);
             MetadataResponse.PartitionMetadata partitionMetadata =
-                new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node, Optional.empty(),
-                    singletonList(node), singletonList(node), singletonList(node));
+                new MetadataResponse.PartitionMetadata(Errors.NONE, new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0),
+                        node, Optional.empty(), singletonList(node), singletonList(node), singletonList(node));
             MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE,
                 Topic.GROUP_METADATA_TOPIC_NAME, true, singletonList(partitionMetadata));
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index 33d102d..ce90089 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -157,7 +157,7 @@ public class ConsumerMetadataTest {
 
     private MetadataResponse.TopicMetadata topicMetadata(String topic, boolean isInternal) {
         MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE,
-                0, node, Optional.of(5), singletonList(node), singletonList(node), singletonList(node));
+                new TopicPartition(topic, 0), node, Optional.of(5), singletonList(node), singletonList(node), singletonList(node));
         return new MetadataResponse.TopicMetadata(Errors.NONE, topic, isInternal, singletonList(partitionMetadata));
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 6440c42..ebad2e0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1947,7 +1947,7 @@ public class FetcherTest {
             for (MetadataResponse.PartitionMetadata p : partitions) {
                 altPartitions.add(new MetadataResponse.PartitionMetadata(
                     p.error(),
-                    p.partition(),
+                    new TopicPartition(item.topic(), p.partition()),
                     null, //no leader
                     Optional.empty(),
                     p.replicas(),
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 1e88fc7..e5bd1df 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1169,13 +1169,15 @@ public class RequestResponseTest {
 
         List<MetadataResponse.TopicMetadata> allTopicMetadata = new ArrayList<>();
         allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "__consumer_offsets", true,
-                asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node,
-                        Optional.of(5), replicas, isr, offlineReplicas))));
+                asList(new MetadataResponse.PartitionMetadata(Errors.NONE,
+                        new TopicPartition("__consumer_offsets", 1), node, Optional.of(5),
+                        replicas, isr, offlineReplicas))));
         allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false,
                 Collections.emptyList()));
         allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "topic3", false,
-            asList(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 0, null,
-                Optional.empty(), replicas, isr, offlineReplicas))));
+            asList(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE,
+                    new TopicPartition("topic3", 0), null,
+                    Optional.empty(), replicas, isr, offlineReplicas))));
 
         return MetadataResponse.prepareResponse(asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata);
     }
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index ece5af3..916feea 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -160,7 +160,7 @@ public class TestUtils {
                 Node leader = nodes.get(i % nodes.size());
                 List<Node> replicas = Collections.singletonList(leader);
                 partitionMetadata.add(partitionSupplier.supply(
-                        Errors.NONE, i, leader, Optional.ofNullable(epochSupplier.apply(tp)), replicas, replicas, replicas));
+                        Errors.NONE, tp, leader, Optional.ofNullable(epochSupplier.apply(tp)), replicas, replicas, replicas));
             }
 
             topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, topic,
@@ -179,7 +179,7 @@ public class TestUtils {
     @FunctionalInterface
     public interface PartitionMetadataSupplier {
         MetadataResponse.PartitionMetadata supply(Errors error,
-                              int partition,
+                              TopicPartition partition,
                               Node leader,
                               Optional<Integer> leaderEpoch,
                               List<Node> replicas,