You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/08 19:42:04 UTC

[GitHub] [kafka] junrao commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

junrao commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r568140154



##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -37,7 +38,10 @@ object FetchSession {
   type REQ_MAP = util.Map[TopicPartition, FetchRequest.PartitionData]
   type RESP_MAP = util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
   type CACHE_MAP = ImplicitLinkedHashCollection[CachedPartition]
+  type UNRESOLVED_CACHE = util.HashSet[CachedUnresolvedPartition]
   type RESP_MAP_ITER = util.Iterator[util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]]]
+  type TOPIC_ID_MAP = util.Map[String,Uuid]

Review comment:
       space after comma

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -187,24 +205,86 @@ class CachedPartition(val topic: String,
   }
 }
 
+/**
+ * Very similar to CachedPartition above, CachedUnresolvedPartition is used for incremental fetch requests.
+ * These objects store partitions that had topic IDs that could not be resolved by the broker.
+ *
+ * Upon each incremental request in the session, these partitions will be loaded. They can either be removed
+ * through resolving the partition with the broker's topicNames map or by receiving an unresolved toForget ID.
+ *
+ * Since these partitions signify an error, they will always be returned in the response.
+ */
+
+class CachedUnresolvedPartition(val topicId: Uuid,
+                      val partition: Int,
+                      var maxBytes: Int,
+                      var fetchOffset: Long,
+                      var leaderEpoch: Optional[Integer],
+                      var fetcherLogStartOffset: Long,
+                      var lastFetchedEpoch: Optional[Integer]) {
+
+  def this(id: Uuid, partition: Int) =
+    this(id, partition, -1, -1, Optional.empty(), -1, Optional.empty[Integer])
+
+  def this(id: Uuid, partition: Int, reqData: FetchRequest.PartitionData) =
+    this(id, partition, reqData.maxBytes, reqData.fetchOffset,
+      reqData.currentLeaderEpoch, reqData.logStartOffset, reqData.lastFetchedEpoch)
+
+  def reqData = new FetchRequest.PartitionData(fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
+
+  def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = {
+    // Update our cached request parameters.
+    maxBytes = reqData.maxBytes
+    fetchOffset = reqData.fetchOffset
+    fetcherLogStartOffset = reqData.logStartOffset
+    leaderEpoch = reqData.currentLeaderEpoch
+    lastFetchedEpoch = reqData.lastFetchedEpoch
+  }
+
+  override def hashCode: Int = (31 * partition) + topicId.hashCode
+
+  def canEqual(that: Any) = that.isInstanceOf[CachedUnresolvedPartition]
+
+  override def equals(that: Any): Boolean =
+    that match {
+      case that: CachedUnresolvedPartition =>
+        this.eq(that) ||
+          (that.canEqual(this) &&
+            this.partition.equals(that.partition) &&
+            this.topicId.equals(that.topicId))
+      case _ => false
+    }
+
+  override def toString: String = synchronized {
+    "CachedPartition(Id=" + topicId +
+      ", partition=" + partition +
+      ", maxBytes=" + maxBytes +
+      ", fetchOffset=" + fetchOffset +
+      ", fetcherLogStartOffset=" + fetcherLogStartOffset +
+      ")"
+  }
+}
+
 /**
   * The fetch session.
   *
   * Each fetch session is protected by its own lock, which must be taken before mutable
   * fields are read or modified.  This includes modification of the session partition map.
   *
-  * @param id           The unique fetch session ID.
-  * @param privileged   True if this session is privileged.  Sessions crated by followers
-  *                     are privileged; sesssion created by consumers are not.
-  * @param partitionMap The CachedPartitionMap.
-  * @param creationMs   The time in milliseconds when this session was created.
-  * @param lastUsedMs   The last used time in milliseconds.  This should only be updated by
-  *                     FetchSessionCache#touch.
-  * @param epoch        The fetch session sequence number.
+  * @param id                     The unique fetch session ID.
+  * @param privileged             True if this session is privileged.  Sessions crated by followers
+  *                               are privileged; sesssion created by consumers are not.
+  * @param partitionMap           The CachedPartitionMap.
+ *  @param unresolvedPartitions   The CachedUnresolvedPartitionMap
+  * @param creationMs             The time in milliseconds when this session was created.
+  * @param lastUsedMs             The last used time in milliseconds.  This should only be updated by
+  *                               FetchSessionCache#touch.
+  * @param epoch                  The fetch session sequence number.
   */
 class FetchSession(val id: Int,
                    val privileged: Boolean,
                    val partitionMap: FetchSession.CACHE_MAP,
+                   val unresolvedPartitions: FetchSession.UNRESOLVED_CACHE,

Review comment:
       Since we are adding some complexity, it would be useful to make the code a bit easier to understand for other people. For example, perhaps we could add comments to explain (1) what partitions will be included in unresolvedPartitions vs partitionMap? (2) are partitions mutually exclusive between unresolvedPartitions and partitionMap? (3) how are partitions in unresolvedPartitions and partitionMap handled different for fetch response?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -851,9 +854,9 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (fetchRequest.isFromFollower) {
         // We've already evaluated against the quota and are good to go. Just need to record it now.
         unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
-        val responseSize = KafkaApis.sizeOfThrottledPartitions(versionId, unconvertedFetchResponse, quotas.leader)
+        val responseSize = KafkaApis.sizeOfThrottledPartitions(versionId, unconvertedFetchResponse, quotas.leader, fetchContext.getIdErrors().asScala.toList, topicNames, topicIds)
         quotas.leader.record(responseSize)
-        trace(s"Sending Fetch response with partitions.size=${unconvertedFetchResponse.responseData.size}, " +
+        trace(s"Sending Fetch response with partitions.size=${unconvertedFetchResponse.resolvedResponseData.size}, " +

Review comment:
       Perhaps we could log both the resolved partitions' size and unresolved partitions' size.

##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -189,6 +189,14 @@ class MetadataCache(brokerId: Int) extends Logging {
     }.toSet
   }
 
+  def getTopicIds(): Map[String, Uuid] = {

Review comment:
       getTopicIds => topicNamesToIds?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -353,50 +508,68 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque
   * @param time               The clock to use.
   * @param cache              The fetch session cache.
   * @param reqMetadata        The request metadata.
-  * @param fetchData          The partition data from the fetch request.
+  * @param fetchDataAndError  The partition data and topic ID errors from the fetch request.
+  * @param topicIds           The map from topic name to topic IDs
   * @param isFromFollower     True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
                        private val cache: FetchSessionCache,
                        private val reqMetadata: JFetchMetadata,
-                       private val fetchData: util.Map[TopicPartition, FetchRequest.PartitionData],
+                       private val fetchDataAndError: FetchDataAndError,
+                       private val topicIds: util.Map[String, Uuid],
                        private val isFromFollower: Boolean) extends FetchContext {
+  val idErrors = new util.LinkedList(fetchDataAndError.idErrors.values())
   override def getFetchOffset(part: TopicPartition): Option[Long] =
-    Option(fetchData.get(part)).map(_.fetchOffset)
+    Option(fetchDataAndError.fetchData.get(part)).map(_.fetchOffset)
 
   override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit = {
-    fetchData.forEach(fun(_, _))
+    fetchDataAndError.fetchData.forEach(fun(_, _))
   }
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = {
-    FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
+    FetchResponse.sizeOf(versionId, updates.entrySet.iterator, idErrors, topicIds)
   }
 
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records] = {
     def createNewSession: FetchSession.CACHE_MAP = {
       val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
       updates.forEach { (part, respData) =>
-        val reqData = fetchData.get(part)
-        cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData))
+        val reqData = fetchDataAndError.fetchData.get(part)
+        cachedPartitions.mustAdd(new CachedPartition(part, topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID), reqData, respData))

Review comment:
       `topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID)`
   
   If we always expect the topic to be found in topicIds, we should just throw an exception instead of using a default. If this is expected, we probably should convert it to an unresolved partition?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -237,14 +317,80 @@ class FetchSession(val id: Int,
   type TL = util.ArrayList[TopicPartition]
 
   // Update the cached partition data based on the request.
-  def update(fetchData: FetchSession.REQ_MAP,
-             toForget: util.List[TopicPartition],
-             reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+  def update(version: Short,
+             fetchDataAndError: FetchDataAndError,
+             toForgetAndIds: ToForgetAndIds,
+             reqMetadata: JFetchMetadata,
+             topicIds: util.Map[String, Uuid],
+             topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized {
     val added = new TL
     val updated = new TL
     val removed = new TL
-    fetchData.forEach { (topicPart, reqData) =>
-      val newCachedPart = new CachedPartition(topicPart, reqData)
+
+    // Only make changes to topic IDs if we have a new request version.
+    // If we receive an old request version, ignore all topic ID code, keep IDs that are there.
+    if (version >= 13) {
+      val error = if (topicNames.isEmpty) Errors.UNSUPPORTED_VERSION else Errors.UNKNOWN_TOPIC_ID
+      val unresolvedIterator = unresolvedPartitions.iterator()
+      while (unresolvedIterator.hasNext()) {
+        val partition = unresolvedIterator.next()
+
+        // Remove from unresolvedPartitions if ID is unresolved in toForgetIds
+        val forgetPartitions = toForgetAndIds.toForgetIds.get(partition.topicId)
+        if (forgetPartitions != null && forgetPartitions.contains(partition.partition))
+          unresolvedIterator.remove()
+
+        // Try to resolve ID, if there is a name for the given ID, place a CachedPartition in partitionMap
+        // and remove from unresolvedPartitions.
+        else if (topicNames.get(partition.topicId) != null) {
+          val newTp = new TopicPartition(topicNames.get(partition.topicId), partition.partition)
+          val newCp = new CachedPartition(newTp, partition.topicId, partition.reqData)
+          partitionMap.add(newCp)
+          added.add(newTp)
+          unresolvedIterator.remove()
+        } else {
+          val idError = fetchDataAndError.idErrors.get(partition.topicId)

Review comment:
       Do we need this part of the logic? It seems that the same is already done through FetchReqeust.fetchDataAndError().

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -133,6 +200,71 @@ public String toString() {
         return Collections.unmodifiableMap(result);
     }
 
+    private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics, Map<Uuid, String> topicNames) {
+        Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
+        fetchableTopics.forEach(fetchTopic -> {
+            String name = topicNames.get(fetchTopic.topicId());
+            if (name != null) {
+                fetchTopic.partitions().forEach(fetchPartition ->
+                    result.put(new TopicPartition(name, fetchPartition.partition()),
+                        new PartitionData(
+                                fetchPartition.fetchOffset(),
+                                fetchPartition.logStartOffset(),
+                                fetchPartition.partitionMaxBytes(),
+                                optionalEpoch(fetchPartition.currentLeaderEpoch()),
+                                optionalEpoch(fetchPartition.lastFetchedEpoch())
+                        )
+                    )
+                );
+            }
+        });
+        return Collections.unmodifiableMap(result);
+    }
+
+    // Only used when Fetch is version 13 or greater.
+    private FetchDataAndError toPartitionDataMapAndError(List<FetchRequestData.FetchTopic> fetchableTopics, Map<Uuid, String> topicNames) {
+        Map<TopicPartition, PartitionData> fetchData = new LinkedHashMap<>();
+        List<UnresolvedPartitions> unresolvedPartitions = new LinkedList<>();
+        Map<Uuid, FetchResponse.IdError> idErrors = new HashMap<>();
+        Errors error;
+        if (topicNames.isEmpty()) {
+            error = Errors.UNSUPPORTED_VERSION;
+        } else {
+            error = Errors.UNKNOWN_TOPIC_ID;
+        }
+        fetchableTopics.forEach(fetchTopic -> {
+            String name = topicNames.get(fetchTopic.topicId());
+            if (name != null) {
+                fetchTopic.partitions().forEach(fetchPartition ->
+                        fetchData.put(new TopicPartition(name, fetchPartition.partition()),
+                                new PartitionData(
+                                        fetchPartition.fetchOffset(),
+                                        fetchPartition.logStartOffset(),
+                                        fetchPartition.partitionMaxBytes(),
+                                        optionalEpoch(fetchPartition.currentLeaderEpoch()),
+                                        optionalEpoch(fetchPartition.lastFetchedEpoch())
+                                )
+                        )
+                );
+            } else {
+                unresolvedPartitions.add(new UnresolvedPartitions(fetchTopic.topicId(), fetchTopic.partitions().stream().collect(Collectors.toMap(
+                        FetchRequestData.FetchPartition::partition, fetchPartition -> new PartitionData(
+                        fetchPartition.fetchOffset(),
+                        fetchPartition.logStartOffset(),
+                        fetchPartition.partitionMaxBytes(),
+                        optionalEpoch(fetchPartition.currentLeaderEpoch()),
+                        optionalEpoch(fetchPartition.lastFetchedEpoch()))))));
+
+                if (idErrors.containsKey(fetchTopic.topicId()))
+                    idErrors.get(fetchTopic.topicId()).addPartitions(fetchTopic.partitions().stream().map(part -> part.partition()).collect(Collectors.toList()));
+                else
+                    idErrors.put(fetchTopic.topicId(), new FetchResponse.IdError(fetchTopic.topicId(), fetchTopic.partitions().stream().map(part -> part.partition()).collect(Collectors.toList()),

Review comment:
       Hmm, why do we need to do collect() at the end? 

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -270,24 +314,40 @@ public T records() {
      */
     public FetchResponse(Errors error,
                          LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
+                         List<IdError> idErrors,

Review comment:
       Should we add the new params to the javadoc above? In particular, could we explain the relationship between responseData and idErrors? Also, could we name the params clearer? For example, responseData => partitionsWithMatchingTopicId, idErrors => partitionsWithoutMatchingTopicId.

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -237,14 +317,80 @@ class FetchSession(val id: Int,
   type TL = util.ArrayList[TopicPartition]
 
   // Update the cached partition data based on the request.
-  def update(fetchData: FetchSession.REQ_MAP,
-             toForget: util.List[TopicPartition],
-             reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+  def update(version: Short,
+             fetchDataAndError: FetchDataAndError,
+             toForgetAndIds: ToForgetAndIds,
+             reqMetadata: JFetchMetadata,
+             topicIds: util.Map[String, Uuid],
+             topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized {
     val added = new TL
     val updated = new TL
     val removed = new TL
-    fetchData.forEach { (topicPart, reqData) =>
-      val newCachedPart = new CachedPartition(topicPart, reqData)
+
+    // Only make changes to topic IDs if we have a new request version.
+    // If we receive an old request version, ignore all topic ID code, keep IDs that are there.
+    if (version >= 13) {
+      val error = if (topicNames.isEmpty) Errors.UNSUPPORTED_VERSION else Errors.UNKNOWN_TOPIC_ID
+      val unresolvedIterator = unresolvedPartitions.iterator()
+      while (unresolvedIterator.hasNext()) {
+        val partition = unresolvedIterator.next()
+
+        // Remove from unresolvedPartitions if ID is unresolved in toForgetIds
+        val forgetPartitions = toForgetAndIds.toForgetIds.get(partition.topicId)
+        if (forgetPartitions != null && forgetPartitions.contains(partition.partition))
+          unresolvedIterator.remove()
+
+        // Try to resolve ID, if there is a name for the given ID, place a CachedPartition in partitionMap
+        // and remove from unresolvedPartitions.

Review comment:
       It's a bit weird to add a comment that breaks the if/else clause. Perhaps we could put the comment inside the `else if`?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -259,8 +262,49 @@ public T records() {
         }
     }
 
+    public static final class IdError {
+        private final Uuid id;
+        private final Set<Integer> partitions;
+        private final Errors error;
+
+        public IdError(Uuid id,
+                List<Integer> partitions,
+                Errors error) {
+            this.id = id;
+            this.partitions = new HashSet<>(partitions);
+            this.error = error;
+        }
+
+        public Uuid id() {
+            return this.id;
+        }
+
+        public Set<Integer> partitions() {
+            return this.partitions;
+        }
+
+        public void addPartitions(List<Integer> partitions) {
+            partitions.forEach(partition -> {
+                partitions.add(partition);
+            });
+        }
+
+        private List<FetchResponseData.FetchablePartitionResponse> errorData() {
+            return partitions.stream().map(partition -> new FetchResponseData.FetchablePartitionResponse()
+                    .setPartition(partition)
+                    .setErrorCode(error.code())
+                    .setHighWatermark(FetchResponse.INVALID_HIGHWATERMARK)
+                    .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+                    .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET)
+                    .setAbortedTransactions(null)
+                    .setPreferredReadReplica(FetchResponse.INVALID_PREFERRED_REPLICA_ID)
+                    .setRecordSet(MemoryRecords.EMPTY)).collect(Collectors.toList());
+        }
+
+    }
+
     /**
-     * From version 3 or later, the entries in `responseData` should be in the same order as the entries in
+     * From version 3 or later, the 'interesting' entries in `responseData` should be in the same order as the entries in

Review comment:
       What's the definition of 'interesting'?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -71,6 +75,7 @@ object FetchSession {
   * localLogStartOffset is the log start offset of the partition on this broker.
   */
 class CachedPartition(val topic: String,
+                      var topicId: Uuid,

Review comment:
       Should we change `hashCode() `and `equals()` to include topicId?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -237,14 +317,80 @@ class FetchSession(val id: Int,
   type TL = util.ArrayList[TopicPartition]
 
   // Update the cached partition data based on the request.
-  def update(fetchData: FetchSession.REQ_MAP,
-             toForget: util.List[TopicPartition],
-             reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+  def update(version: Short,
+             fetchDataAndError: FetchDataAndError,
+             toForgetAndIds: ToForgetAndIds,
+             reqMetadata: JFetchMetadata,
+             topicIds: util.Map[String, Uuid],
+             topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized {
     val added = new TL
     val updated = new TL
     val removed = new TL
-    fetchData.forEach { (topicPart, reqData) =>
-      val newCachedPart = new CachedPartition(topicPart, reqData)
+
+    // Only make changes to topic IDs if we have a new request version.

Review comment:
       It seems that the following code makes changes to unresolvedPartitions, not topic IDs.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -270,24 +314,40 @@ public T records() {
      */
     public FetchResponse(Errors error,
                          LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
+                         List<IdError> idErrors,
+                         Map<String, Uuid> topicIds,
                          int throttleTimeMs,
                          int sessionId) {
         super(ApiKeys.FETCH);
-        this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+        this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), idErrors, topicIds, sessionId);
         this.responseDataMap = responseData;
     }
 
     public FetchResponse(FetchResponseData fetchResponseData) {
         super(ApiKeys.FETCH);
         this.data = fetchResponseData;
-        this.responseDataMap = toResponseDataMap(fetchResponseData);
+        if (!supportsTopicIds()) {
+            this.responseDataMap = toResponseDataMap(fetchResponseData);
+        } else {
+            this.responseDataMap = null;
+        }
     }
 
     public Errors error() {
         return Errors.forCode(data.errorCode());
     }
 
-    public LinkedHashMap<TopicPartition, PartitionData<T>> responseData() {
+    public LinkedHashMap<TopicPartition, PartitionData<T>> responseData(Map<Uuid, String> topicNames) {

Review comment:
       topicNames => topicIdToNameMap?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -133,6 +200,71 @@ public String toString() {
         return Collections.unmodifiableMap(result);
     }
 
+    private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics, Map<Uuid, String> topicNames) {
+        Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
+        fetchableTopics.forEach(fetchTopic -> {
+            String name = topicNames.get(fetchTopic.topicId());
+            if (name != null) {
+                fetchTopic.partitions().forEach(fetchPartition ->
+                    result.put(new TopicPartition(name, fetchPartition.partition()),
+                        new PartitionData(
+                                fetchPartition.fetchOffset(),
+                                fetchPartition.logStartOffset(),
+                                fetchPartition.partitionMaxBytes(),
+                                optionalEpoch(fetchPartition.currentLeaderEpoch()),
+                                optionalEpoch(fetchPartition.lastFetchedEpoch())
+                        )
+                    )
+                );
+            }
+        });
+        return Collections.unmodifiableMap(result);
+    }
+
+    // Only used when Fetch is version 13 or greater.
+    private FetchDataAndError toPartitionDataMapAndError(List<FetchRequestData.FetchTopic> fetchableTopics, Map<Uuid, String> topicNames) {
+        Map<TopicPartition, PartitionData> fetchData = new LinkedHashMap<>();
+        List<UnresolvedPartitions> unresolvedPartitions = new LinkedList<>();
+        Map<Uuid, FetchResponse.IdError> idErrors = new HashMap<>();
+        Errors error;
+        if (topicNames.isEmpty()) {
+            error = Errors.UNSUPPORTED_VERSION;
+        } else {
+            error = Errors.UNKNOWN_TOPIC_ID;
+        }
+        fetchableTopics.forEach(fetchTopic -> {
+            String name = topicNames.get(fetchTopic.topicId());
+            if (name != null) {
+                fetchTopic.partitions().forEach(fetchPartition ->
+                        fetchData.put(new TopicPartition(name, fetchPartition.partition()),
+                                new PartitionData(
+                                        fetchPartition.fetchOffset(),
+                                        fetchPartition.logStartOffset(),
+                                        fetchPartition.partitionMaxBytes(),
+                                        optionalEpoch(fetchPartition.currentLeaderEpoch()),
+                                        optionalEpoch(fetchPartition.lastFetchedEpoch())
+                                )
+                        )
+                );
+            } else {
+                unresolvedPartitions.add(new UnresolvedPartitions(fetchTopic.topicId(), fetchTopic.partitions().stream().collect(Collectors.toMap(
+                        FetchRequestData.FetchPartition::partition, fetchPartition -> new PartitionData(
+                        fetchPartition.fetchOffset(),
+                        fetchPartition.logStartOffset(),
+                        fetchPartition.partitionMaxBytes(),
+                        optionalEpoch(fetchPartition.currentLeaderEpoch()),
+                        optionalEpoch(fetchPartition.lastFetchedEpoch()))))));
+
+                if (idErrors.containsKey(fetchTopic.topicId()))
+                    idErrors.get(fetchTopic.topicId()).addPartitions(fetchTopic.partitions().stream().map(part -> part.partition()).collect(Collectors.toList()));

Review comment:
       Hmm, why do we need to do collect() at the end? The returned value doesn't seem be be used.

##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -186,23 +241,34 @@ public String toString() {
          * incremental fetch requests (see below).
          */
         private LinkedHashMap<TopicPartition, PartitionData> next;
+        private Map<String, Uuid> topicIds;
+        private Map<Uuid, String> topicNames;
+        private Map<String, Set<Integer>> partitionsPerTopic;
         private final boolean copySessionPartitions;
 
         Builder() {
             this.next = new LinkedHashMap<>();
+            this.topicIds = new HashMap<>();

Review comment:
       Now that the constructor code is a bit more now, perhaps we could just forward `Builder()` to `Builder(int initialSize, boolean copySessionPartitions) `?

##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -186,23 +241,34 @@ public String toString() {
          * incremental fetch requests (see below).
          */
         private LinkedHashMap<TopicPartition, PartitionData> next;
+        private Map<String, Uuid> topicIds;
+        private Map<Uuid, String> topicNames;
+        private Map<String, Set<Integer>> partitionsPerTopic;
         private final boolean copySessionPartitions;
 
         Builder() {
             this.next = new LinkedHashMap<>();
+            this.topicIds = new HashMap<>();
+            this.topicNames = new HashMap<>();
+            this.partitionsPerTopic = new HashMap<>();
             this.copySessionPartitions = true;
         }
 
         Builder(int initialSize, boolean copySessionPartitions) {
             this.next = new LinkedHashMap<>(initialSize);
+            this.topicIds = new HashMap<>(initialSize);
+            this.topicNames = new HashMap<>(initialSize);
+            this.partitionsPerTopic = new HashMap<>(initialSize);
             this.copySessionPartitions = copySessionPartitions;
         }
 
         /**
          * Mark that we want data from this partition in the upcoming fetch.
          */
-        public void add(TopicPartition topicPartition, PartitionData data) {
-            next.put(topicPartition, data);
+        public void add(TopicPartition topicPartition, Uuid id, PartitionData data) {

Review comment:
       id => topicId ?

##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -189,6 +189,14 @@ class MetadataCache(brokerId: Int) extends Logging {
     }.toSet
   }
 
+  def getTopicIds(): Map[String, Uuid] = {
+    metadataSnapshot.topicIds
+  }
+
+  def getTopicNames(): Map[Uuid, String] = {

Review comment:
       getTopicNames => topicIdsToNames?

##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -73,6 +76,25 @@ public FetchSessionHandler(LogContext logContext, int node) {
     private LinkedHashMap<TopicPartition, PartitionData> sessionPartitions =
         new LinkedHashMap<>(0);
 
+    /**
+     * All of the topic ids mapped to topic names for topics which exist in the fetch request session.
+     */
+    private Map<String, Uuid> sessionTopicIds = new HashMap<>(0);
+
+    /**
+     * All of the topic names mapped to topic ids for topics which exist in the fetch request session.
+     */
+    private Map<Uuid, String> sessionTopicNames = new HashMap<>(0);

Review comment:
       We could probably just get rid of session since it's part of the session object. Ditto below.

##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -73,6 +76,25 @@ public FetchSessionHandler(LogContext logContext, int node) {
     private LinkedHashMap<TopicPartition, PartitionData> sessionPartitions =
         new LinkedHashMap<>(0);
 
+    /**
+     * All of the topic ids mapped to topic names for topics which exist in the fetch request session.
+     */
+    private Map<String, Uuid> sessionTopicIds = new HashMap<>(0);

Review comment:
       Is there a reason to use 0 instead of the default capacity for the hashmap?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3419,10 +3422,13 @@ class KafkaApis(val requestChannel: RequestChannel,
 object KafkaApis {
   // Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication
   // traffic doesn't exceed quota.
-  private[server] def sizeOfThrottledPartitions(versionId: Short,
+  private[server] def sizeOfThrottledPartitions[T >: MemoryRecords <: BaseRecords](versionId: Short,
                                                 unconvertedResponse: FetchResponse[Records],
-                                                quota: ReplicationQuotaManager): Int = {
-    FetchResponse.sizeOf(versionId, unconvertedResponse.responseData.entrySet
-      .iterator.asScala.filter(element => quota.isThrottled(element.getKey)).asJava)
+                                                quota: ReplicationQuotaManager,
+                                                idErrors: List[FetchResponse.IdError],
+                                                topicNames: util.Map[Uuid, String],

Review comment:
       It seems that topicNames is unused?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -237,14 +317,80 @@ class FetchSession(val id: Int,
   type TL = util.ArrayList[TopicPartition]
 
   // Update the cached partition data based on the request.
-  def update(fetchData: FetchSession.REQ_MAP,
-             toForget: util.List[TopicPartition],
-             reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+  def update(version: Short,
+             fetchDataAndError: FetchDataAndError,
+             toForgetAndIds: ToForgetAndIds,
+             reqMetadata: JFetchMetadata,
+             topicIds: util.Map[String, Uuid],
+             topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized {
     val added = new TL
     val updated = new TL
     val removed = new TL
-    fetchData.forEach { (topicPart, reqData) =>
-      val newCachedPart = new CachedPartition(topicPart, reqData)
+
+    // Only make changes to topic IDs if we have a new request version.
+    // If we receive an old request version, ignore all topic ID code, keep IDs that are there.
+    if (version >= 13) {
+      val error = if (topicNames.isEmpty) Errors.UNSUPPORTED_VERSION else Errors.UNKNOWN_TOPIC_ID

Review comment:
       Hmm, not sure if we need to distinguish here. It seems that it's easier to just always send a UNKNOWN_TOPIC_ID since the propagation of all topic Ids could be delayed?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -353,50 +508,68 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque
   * @param time               The clock to use.
   * @param cache              The fetch session cache.
   * @param reqMetadata        The request metadata.
-  * @param fetchData          The partition data from the fetch request.
+  * @param fetchDataAndError  The partition data and topic ID errors from the fetch request.
+  * @param topicIds           The map from topic name to topic IDs
   * @param isFromFollower     True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
                        private val cache: FetchSessionCache,
                        private val reqMetadata: JFetchMetadata,
-                       private val fetchData: util.Map[TopicPartition, FetchRequest.PartitionData],
+                       private val fetchDataAndError: FetchDataAndError,
+                       private val topicIds: util.Map[String, Uuid],
                        private val isFromFollower: Boolean) extends FetchContext {
+  val idErrors = new util.LinkedList(fetchDataAndError.idErrors.values())
   override def getFetchOffset(part: TopicPartition): Option[Long] =
-    Option(fetchData.get(part)).map(_.fetchOffset)
+    Option(fetchDataAndError.fetchData.get(part)).map(_.fetchOffset)
 
   override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit = {
-    fetchData.forEach(fun(_, _))
+    fetchDataAndError.fetchData.forEach(fun(_, _))

Review comment:
       Now that we are changing the semantics for this method to only iterating resolved partitions, it would be useful to have a more appropriate method name to make it clear.
   
   Also, it seems that some of the callers need to iterate all partitions including unresolved ones (e.g., those checking for CLUSTER ACTION permissions) while some others need to iterate resolved ones (e.g, those checking for topic level permissions).

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -110,7 +116,68 @@ public String toString() {
         }
     }
 
-    private Optional<Integer> optionalEpoch(int rawEpochValue) {
+    public static final class UnresolvedPartitions {

Review comment:
       It seems this is about a topic. So, UnresolvedPartitions is better named as UnresolvedTopic?

##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -265,9 +353,32 @@ public FetchRequestData build() {
             Map<TopicPartition, PartitionData> curSessionPartitions = copySessionPartitions
                     ? Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions))
                     : Collections.unmodifiableMap(sessionPartitions);
+            Map<String, Uuid> toSendTopicIds =
+                    Collections.unmodifiableMap(new HashMap<>(sessionTopicIds));
+            Map<Uuid, String> toSendTopicNames =
+                    Collections.unmodifiableMap(new HashMap<>(sessionTopicNames));
+            boolean canUseTopicIds = sessionPartitionsPerTopic.size() == toSendTopicIds.size();
+
             next = null;
+            topicIds = null;
+            topicNames = null;
+            partitionsPerTopic = null;
             return new FetchRequestData(toSend, Collections.unmodifiableList(removed),
-                curSessionPartitions, nextMetadata);
+                curSessionPartitions, toSendTopicIds, toSendTopicNames, nextMetadata, canUseTopicIds);
+        }
+
+        private void addPartitionsAndIds(Map<String, Set<Integer>> partitionsPerTopic,
+                                         TopicPartition tp, Uuid id, Map<String, Uuid> topicIds,
+                                         Map<Uuid, String> topicNames) {
+            if (partitionsPerTopic.containsKey(tp.topic())) {
+                partitionsPerTopic.get(tp.topic()).add(tp.partition());
+            } else {
+                partitionsPerTopic.put(tp.topic(), new HashSet<>(tp.partition()));

Review comment:
       Hmm, we should be adding tp.partition() to the hashset and not using it for the initial capacity, right?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -270,24 +314,40 @@ public T records() {
      */
     public FetchResponse(Errors error,
                          LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
+                         List<IdError> idErrors,
+                         Map<String, Uuid> topicIds,
                          int throttleTimeMs,
                          int sessionId) {
         super(ApiKeys.FETCH);
-        this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+        this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), idErrors, topicIds, sessionId);
         this.responseDataMap = responseData;
     }
 
     public FetchResponse(FetchResponseData fetchResponseData) {
         super(ApiKeys.FETCH);
         this.data = fetchResponseData;
-        this.responseDataMap = toResponseDataMap(fetchResponseData);
+        if (!supportsTopicIds()) {
+            this.responseDataMap = toResponseDataMap(fetchResponseData);
+        } else {
+            this.responseDataMap = null;
+        }
     }
 
     public Errors error() {
         return Errors.forCode(data.errorCode());
     }
 
-    public LinkedHashMap<TopicPartition, PartitionData<T>> responseData() {
+    public LinkedHashMap<TopicPartition, PartitionData<T>> responseData(Map<Uuid, String> topicNames) {
+        if (!supportsTopicIds())
+            return responseDataMap;
+        return toResponseDataMap(data, topicNames);
+
+    }
+
+    // Used when we can guarantee responseData is populated with all possible partitions
+    // This occurs when we have a response version < 13 or we built the FetchResponse with
+    // responseDataMap as a parameter and we have the same topic IDs available.
+    public LinkedHashMap<TopicPartition, PartitionData<T>> resolvedResponseData() {

Review comment:
       This method is kind of weird. It's only used in KafkaApis where topic name has already been resolved. The only reason for this method is that FetchContext.updateAndGenerateResponseData() generates FetchResponse, which is used in createResponse(). Instead, could we have FetchContext.updateAndGenerateResponseData() return a different class that includes the resolved partitions?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -725,10 +915,12 @@ class FetchSessionCache(private val maxEntries: Int,
 
 class FetchManager(private val time: Time,
                    private val cache: FetchSessionCache) extends Logging {
-  def newContext(reqMetadata: JFetchMetadata,
-                 fetchData: FetchSession.REQ_MAP,
-                 toForget: util.List[TopicPartition],
-                 isFollower: Boolean): FetchContext = {
+  def newContext(request: FetchRequest,

Review comment:
       It's kind of weird to pass in a request into FetchSession.

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -353,50 +508,68 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition, FetchReque
   * @param time               The clock to use.
   * @param cache              The fetch session cache.
   * @param reqMetadata        The request metadata.
-  * @param fetchData          The partition data from the fetch request.
+  * @param fetchDataAndError  The partition data and topic ID errors from the fetch request.
+  * @param topicIds           The map from topic name to topic IDs
   * @param isFromFollower     True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
                        private val cache: FetchSessionCache,
                        private val reqMetadata: JFetchMetadata,
-                       private val fetchData: util.Map[TopicPartition, FetchRequest.PartitionData],
+                       private val fetchDataAndError: FetchDataAndError,
+                       private val topicIds: util.Map[String, Uuid],
                        private val isFromFollower: Boolean) extends FetchContext {
+  val idErrors = new util.LinkedList(fetchDataAndError.idErrors.values())
   override def getFetchOffset(part: TopicPartition): Option[Long] =
-    Option(fetchData.get(part)).map(_.fetchOffset)
+    Option(fetchDataAndError.fetchData.get(part)).map(_.fetchOffset)
 
   override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit): Unit = {
-    fetchData.forEach(fun(_, _))
+    fetchDataAndError.fetchData.forEach(fun(_, _))
   }
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = {
-    FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
+    FetchResponse.sizeOf(versionId, updates.entrySet.iterator, idErrors, topicIds)
   }
 
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records] = {
     def createNewSession: FetchSession.CACHE_MAP = {
       val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
       updates.forEach { (part, respData) =>
-        val reqData = fetchData.get(part)
-        cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData))
+        val reqData = fetchDataAndError.fetchData.get(part)
+        cachedPartitions.mustAdd(new CachedPartition(part, topicIds.getOrDefault(part.topic(), Uuid.ZERO_UUID), reqData, respData))
       }
       cachedPartitions
     }
+    def createNewSessionIdErrors: FetchSession.UNRESOLVED_CACHE = {

Review comment:
       Could we name the methods better to make it easier to understand? For example,
   
   createNewSession => generateResolvedPartitions
   createNewSessionIdErrors => generateUnresolvedPartitions

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -662,11 +662,14 @@ class KafkaApis(val requestChannel: RequestChannel,
     val versionId = request.header.apiVersion
     val clientId = request.header.clientId
     val fetchRequest = request.body[FetchRequest]
+    val topicNames = if (fetchRequest.version() >= 13) metadataCache.getTopicNames().asJava else Collections.emptyMap[Uuid, String]()
+    val topicIds = if (fetchRequest.version() >= 13) metadataCache.getTopicIds().asJava else Collections.emptyMap[String, Uuid]()

Review comment:
       The metadata cache could change between the two calls. Could we have a single call to metadata cache that returns both topicNames and topicIds?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -110,7 +116,68 @@ public String toString() {
         }
     }
 
-    private Optional<Integer> optionalEpoch(int rawEpochValue) {
+    public static final class UnresolvedPartitions {
+        private final Uuid id;
+        private final Map<Integer, PartitionData> partitionData;
+
+        public UnresolvedPartitions(Uuid id, Map<Integer, PartitionData> partitionData) {
+            this.id = id;
+            this.partitionData = partitionData;
+        }
+
+        public Uuid id() {
+            return id;
+        }
+
+        public Map<Integer, PartitionData> partitionData() {
+            return partitionData;
+        }
+    }
+
+
+    public static final class FetchDataAndError {
+        private final Map<TopicPartition, PartitionData> fetchData;
+        private final List<UnresolvedPartitions> unresolvedPartitions;
+        private final Map<Uuid, FetchResponse.IdError> idErrors;
+
+        public FetchDataAndError(Map<TopicPartition, PartitionData> fetchData, List<UnresolvedPartitions> unresolvedPartitions, Map<Uuid, FetchResponse.IdError> idErrors) {
+            this.fetchData = fetchData;
+            this.unresolvedPartitions = unresolvedPartitions;
+            this.idErrors = idErrors;
+        }
+
+        public final Map<TopicPartition, PartitionData> fetchData() {
+            return fetchData;
+        }
+
+        public final List<UnresolvedPartitions> unresolvedPartitions() {
+            return unresolvedPartitions;
+        }
+
+        public final Map<Uuid, FetchResponse.IdError> idErrors() {
+            return idErrors;
+        }
+    }
+
+    public static final class ToForgetAndIds {
+        private final List<TopicPartition> toForget;
+        private final Map<Uuid, Set<Integer>> toForgetIds;

Review comment:
       It seems that toForgetIds is intended for unresolved topicIds. Could we name it more clearly together with toForget and add some comment?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -143,34 +275,71 @@ public String toString() {
         return result;
     }
 
+    private List<TopicPartition> toForgottenTopicList(List<FetchRequestData.ForgottenTopic> forgottenTopics, Map<Uuid, String> topicNames) {
+        List<TopicPartition> result = new ArrayList<>();
+        forgottenTopics.forEach(forgottenTopic ->
+                forgottenTopic.partitions().forEach(partitionId -> {
+                    String name = topicNames.get(forgottenTopic.topicId());
+                    if (name != null)
+                        result.add(new TopicPartition(forgottenTopic.topic(), partitionId));
+                })
+        );
+        return result;
+    }
+
+    // Only used when Fetch is version 13 or greater.
+    private ToForgetAndIds toForgottenTopicListAndIds(List<FetchRequestData.ForgottenTopic> forgottenTopics, Map<Uuid, String> topicNames) {
+        List<TopicPartition> result = new ArrayList<>();
+        Map<Uuid, Set<Integer>> unresolvedIds = new HashMap<>();
+        forgottenTopics.forEach(forgottenTopic -> {
+            Set<Integer> partitions = new HashSet<>();
+            forgottenTopic.partitions().forEach(partitionId -> {
+                String name = topicNames.get(forgottenTopic.topicId());
+                if (name != null)
+                    result.add(new TopicPartition(forgottenTopic.topic(), partitionId));
+                else
+                    partitions.add(partitionId);
+            });
+            if (unresolvedIds.containsKey(forgottenTopic.topicId())) {
+                unresolvedIds.get(forgottenTopic.topicId()).addAll(partitions);
+            } else {
+                unresolvedIds.put(forgottenTopic.topicId(), partitions);

Review comment:
       I am not sure that I follow the logic here. It seems that we always put the forgot topic into unresolvedIds. It seems that we should check the partitions size? Also, perhaps rename partitions to sth like unresolvedPartitions?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -237,14 +317,80 @@ class FetchSession(val id: Int,
   type TL = util.ArrayList[TopicPartition]
 
   // Update the cached partition data based on the request.
-  def update(fetchData: FetchSession.REQ_MAP,
-             toForget: util.List[TopicPartition],
-             reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+  def update(version: Short,
+             fetchDataAndError: FetchDataAndError,
+             toForgetAndIds: ToForgetAndIds,
+             reqMetadata: JFetchMetadata,
+             topicIds: util.Map[String, Uuid],
+             topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized {
     val added = new TL
     val updated = new TL
     val removed = new TL
-    fetchData.forEach { (topicPart, reqData) =>
-      val newCachedPart = new CachedPartition(topicPart, reqData)
+
+    // Only make changes to topic IDs if we have a new request version.
+    // If we receive an old request version, ignore all topic ID code, keep IDs that are there.
+    if (version >= 13) {
+      val error = if (topicNames.isEmpty) Errors.UNSUPPORTED_VERSION else Errors.UNKNOWN_TOPIC_ID
+      val unresolvedIterator = unresolvedPartitions.iterator()
+      while (unresolvedIterator.hasNext()) {
+        val partition = unresolvedIterator.next()
+
+        // Remove from unresolvedPartitions if ID is unresolved in toForgetIds
+        val forgetPartitions = toForgetAndIds.toForgetIds.get(partition.topicId)
+        if (forgetPartitions != null && forgetPartitions.contains(partition.partition))
+          unresolvedIterator.remove()
+
+        // Try to resolve ID, if there is a name for the given ID, place a CachedPartition in partitionMap
+        // and remove from unresolvedPartitions.
+        else if (topicNames.get(partition.topicId) != null) {
+          val newTp = new TopicPartition(topicNames.get(partition.topicId), partition.partition)
+          val newCp = new CachedPartition(newTp, partition.topicId, partition.reqData)
+          partitionMap.add(newCp)

Review comment:
       Should we use mustAdd()?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -187,24 +205,86 @@ class CachedPartition(val topic: String,
   }
 }
 
+/**
+ * Very similar to CachedPartition above, CachedUnresolvedPartition is used for incremental fetch requests.
+ * These objects store partitions that had topic IDs that could not be resolved by the broker.
+ *
+ * Upon each incremental request in the session, these partitions will be loaded. They can either be removed
+ * through resolving the partition with the broker's topicNames map or by receiving an unresolved toForget ID.
+ *
+ * Since these partitions signify an error, they will always be returned in the response.
+ */
+
+class CachedUnresolvedPartition(val topicId: Uuid,
+                      val partition: Int,
+                      var maxBytes: Int,
+                      var fetchOffset: Long,
+                      var leaderEpoch: Optional[Integer],
+                      var fetcherLogStartOffset: Long,
+                      var lastFetchedEpoch: Optional[Integer]) {
+
+  def this(id: Uuid, partition: Int) =
+    this(id, partition, -1, -1, Optional.empty(), -1, Optional.empty[Integer])
+
+  def this(id: Uuid, partition: Int, reqData: FetchRequest.PartitionData) =
+    this(id, partition, reqData.maxBytes, reqData.fetchOffset,
+      reqData.currentLeaderEpoch, reqData.logStartOffset, reqData.lastFetchedEpoch)
+
+  def reqData = new FetchRequest.PartitionData(fetchOffset, fetcherLogStartOffset, maxBytes, leaderEpoch, lastFetchedEpoch)
+
+  def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = {

Review comment:
       This seems unused?

##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -398,7 +536,7 @@ private String responseDataToLogString(FetchResponse<?> response) {
      * @return          True if the response is well-formed; false if it can't be processed
      *                  because of missing or unexpected partitions.
      */
-    public boolean handleResponse(FetchResponse<?> response) {
+    public boolean handleResponse(FetchResponse<?> response, short version) {

Review comment:
       Could we add the new param to the javadoc?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -237,14 +317,80 @@ class FetchSession(val id: Int,
   type TL = util.ArrayList[TopicPartition]
 
   // Update the cached partition data based on the request.
-  def update(fetchData: FetchSession.REQ_MAP,
-             toForget: util.List[TopicPartition],
-             reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+  def update(version: Short,
+             fetchDataAndError: FetchDataAndError,
+             toForgetAndIds: ToForgetAndIds,
+             reqMetadata: JFetchMetadata,
+             topicIds: util.Map[String, Uuid],
+             topicNames: util.Map[Uuid, String]): (TL, TL, TL) = synchronized {
     val added = new TL
     val updated = new TL
     val removed = new TL
-    fetchData.forEach { (topicPart, reqData) =>
-      val newCachedPart = new CachedPartition(topicPart, reqData)
+
+    // Only make changes to topic IDs if we have a new request version.
+    // If we receive an old request version, ignore all topic ID code, keep IDs that are there.
+    if (version >= 13) {

Review comment:
       Hmm, do we need to check version here? FetchResponse.fetchDataAndError() already checked the version.

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -425,16 +598,26 @@ class IncrementalFetchContext(private val time: Time,
         val topicPart = element.getKey
         val respData = element.getValue
         val cachedPart = session.partitionMap.find(new CachedPartition(topicPart))
-        val mustRespond = cachedPart.maybeUpdateResponseData(respData, updateFetchContextAndRemoveUnselected)
-        if (mustRespond) {
+
+        if (cachedPart.topicId == Uuid.ZERO_UUID)
+          cachedPart.addId(topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID))
+
+        if (cachedPart.topicId != topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID)) {
           nextElement = element
-          if (updateFetchContextAndRemoveUnselected) {
-            session.partitionMap.remove(cachedPart)
-            session.partitionMap.mustAdd(cachedPart)
-          }
+          session.partitionMap.remove(cachedPart)
+          iter.remove()

Review comment:
       This excludes the partition in the response. However, it seems we need to send an error back for this partition?

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -216,10 +217,10 @@ class ReplicaFetcherThread(name: String,
     try {
       val clientResponse = leaderEndpoint.sendRequest(fetchRequest)
       val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse[Records]]
-      if (!fetchSessionHandler.handleResponse(fetchResponse)) {
+      if (!fetchSessionHandler.handleResponse(fetchResponse, fetchRequest.latestAllowedVersion())) {

Review comment:
       Should we use the latest version or fetchRequestVersion guarded by IBP?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org