You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/14 22:25:55 UTC

[GitHub] [kafka] junrao commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

junrao commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r649585182



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -817,20 +839,26 @@ class KafkaApis(val requestChannel: RequestChannel,
       def createResponse(throttleTimeMs: Int): FetchResponse = {
         // Down-convert messages for each partition if required
         val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData]
-        unconvertedFetchResponse.responseData.forEach { (tp, unconvertedPartitionData) =>
-          val error = Errors.forCode(unconvertedPartitionData.errorCode)
-          if (error != Errors.NONE)
-            debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
-              s"on partition $tp failed due to ${error.exceptionName}")
-          convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData))
+        unconvertedFetchResponse.data().responses().forEach { topicResponse =>
+          topicResponse.partitions().forEach{ unconvertedPartitionData =>

Review comment:
       Space after forEach.

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -593,19 +619,22 @@ class FetchSessionCache(private val maxEntries: Int,
     * @param now                The current time in milliseconds.
     * @param privileged         True if the new entry we are trying to create is privileged.
     * @param size               The number of cached partitions in the new entry we are trying to create.
-    * @param createPartitions   A callback function which creates the map of cached partitions.
+    * @param version            The version of the request
+    * @param createPartitions   A callback function which creates the map of cached partitions and the mapping from
+    *                           topic name for topic ID for the topics.

Review comment:
       from topic name for topic ID => from topic name to topic ID ?

##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -213,9 +335,22 @@ public FetchRequestData build() {
                 }
                 sessionPartitions = next;
                 next = null;
+                canUseTopicIds = topicIds.keySet().containsAll(sessionPartitions.keySet().stream().map(
+                    tp -> tp.topic()).collect(Collectors.toSet()));
+                // Only add topic IDs to the session if we are using topic IDs.
+                if (canUseTopicIds) {

Review comment:
       Should we set sessionTopicIds and sessionTopicNames to empty map if canUseTopicIds is false?

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -216,12 +217,21 @@ class ReplicaFetcherThread(name: String,
     try {
       val clientResponse = leaderEndpoint.sendRequest(fetchRequest)
       val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse]
-      if (!fetchSessionHandler.handleResponse(fetchResponse)) {
-        Map.empty
+      if (!fetchSessionHandler.handleResponse(fetchResponse, clientResponse.requestHeader().apiVersion())) {
+        if (fetchResponse.error() == Errors.UNKNOWN_TOPIC_ID)
+          throw new UnknownTopicIdException("There was a topic ID in the request that was unknown to the server.")
+        else if (fetchResponse.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR)
+          throw new FetchSessionTopicIdException("There was a topic ID in the request that was inconsistent with the session.")
+        else
+          Map.empty
       } else {
-        fetchResponse.responseData.asScala
+        fetchResponse.responseData(fetchSessionHandler.sessionTopicNames, clientResponse.requestHeader().apiVersion()).asScala
       }
     } catch {
+      case unknownId: UnknownTopicIdException =>
+        throw unknownId
+      case sessionUnknownId: FetchSessionTopicIdException =>
+        throw sessionUnknownId

Review comment:
       If we get FetchSessionTopicIdException, the existing session is going to be invalid. So, it seems that we should start a new session? The same thing seems to apply to UnknownTopicIdException

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -80,19 +91,9 @@ public Errors error() {
         return Errors.forCode(data.errorCode());
     }
 
-    public LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseData() {
-        if (responseData == null) {
-            synchronized (this) {
-                if (responseData == null) {
-                    responseData = new LinkedHashMap<>();
-                    data.responses().forEach(topicResponse ->
-                            topicResponse.partitions().forEach(partition ->
-                                    responseData.put(new TopicPartition(topicResponse.topic(), partition.partitionIndex()), partition))
-                    );
-                }
-            }
-        }
-        return responseData;
+    public LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseData(Map<Uuid, String> topicNames, short version) {
+        return toResponseDataMap(topicNames, version);

Review comment:
       Should we just inline toResponseDataMap() here?

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -70,6 +71,7 @@ object FetchSession {
   * localLogStartOffset is the log start offset of the partition on this broker.
   */
 class CachedPartition(val topic: String,
+                      val topicId: Uuid,

Review comment:
       Should we include topicId in hashCode() and equals()?

##########
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##########
@@ -91,20 +91,23 @@ class ReplicaAlterLogDirsThread(name: String,
       }
     }
 
+    val fetchData = request.fetchData(replicaMgr.metadataCache.topicIdsToNames())
+
     replicaMgr.fetchMessages(
       0L, // timeout is 0 so that the callback will be executed immediately
       Request.FutureLocalReplicaId,
       request.minBytes,
       request.maxBytes,
       false,
-      request.fetchData.asScala.toSeq,
+      fetchData.asScala.toSeq,
+      replicaMgr.metadataCache.topicNamesToIds(),

Review comment:
       Since metadatache could change, it's probably slightly better to get topicIdsToNames and topicNamesToIds once from metadatache so that they are consistent.

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -412,6 +412,12 @@ abstract class AbstractFetcherThread(name: String,
                        "expected to persist.")
                   partitionsWithError += topicPartition
 
+                case Errors.INCONSISTENT_TOPIC_ID =>

Review comment:
       Do we need to handle UNKNOWN_TOPIC_ID here too?

##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -395,12 +416,13 @@ class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {
           error(s"Listeners are not identical across brokers: $aliveNodes")
       }
 
-      val newTopicIds = updateMetadataRequest.topicStates().asScala
-        .map(topicState => (topicState.topicName(), topicState.topicId()))
-        .filter(_._2 != Uuid.ZERO_UUID).toMap
       val topicIds = mutable.Map.empty[String, Uuid]
       topicIds ++= metadataSnapshot.topicIds
-      topicIds ++= newTopicIds
+      val (newTopicIds, newZeroIds) = updateMetadataRequest.topicStates().asScala
+        .map(topicState => (topicState.topicName(), topicState.topicId()))
+        .partition(_._2 != Uuid.ZERO_UUID)

Review comment:
       Could we use case to avoid unnamed reference _._2 to make it easier to read?

##########
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##########
@@ -274,9 +278,14 @@ class ReplicaAlterLogDirsThread(name: String,
     val fetchRequestOpt = if (requestMap.isEmpty) {
       None
     } else {
+      val version: Short = if (ApiKeys.FETCH.latestVersion >= 13 && topicIds.containsKey(tp.topic()))

Review comment:
       Is the test ApiKeys.FETCH.latestVersion >= 13 necessary? This code is added when we introduce version 13 as the latest fetch version.

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -294,8 +296,9 @@ class ReplicaFetcherThread(name: String,
     val fetchRequestOpt = if (fetchData.sessionPartitions.isEmpty && fetchData.toForget.isEmpty) {
       None
     } else {
+      val version: Short = if (fetchRequestVersion >= 13 && !fetchData.canUseTopicIds) 12 else fetchRequestVersion

Review comment:
       Is the test ApiKeys.FETCH.latestVersion >= 13 necessary? This code is added when we introduce version 13 as the latest fetch version.

##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -186,23 +268,63 @@ public String toString() {
          * incremental fetch requests (see below).
          */
         private LinkedHashMap<TopicPartition, PartitionData> next;
+        private Map<String, Uuid> topicIds;
         private final boolean copySessionPartitions;
+        private boolean missingTopicIds;
 
         Builder() {
             this.next = new LinkedHashMap<>();
+            this.topicIds = new HashMap<>();
             this.copySessionPartitions = true;
         }
 
         Builder(int initialSize, boolean copySessionPartitions) {
             this.next = new LinkedHashMap<>(initialSize);
+            this.topicIds = new HashMap<>(initialSize);
             this.copySessionPartitions = copySessionPartitions;
         }
 
         /**
          * Mark that we want data from this partition in the upcoming fetch.
          */
-        public void add(TopicPartition topicPartition, PartitionData data) {
+        public void add(TopicPartition topicPartition, Uuid topicId, PartitionData data) {
             next.put(topicPartition, data);
+            // topicIds do not change between adding partitions and building, so we can use putIfAbsent
+            if (!topicId.equals(Uuid.ZERO_UUID)) {
+                topicIds.putIfAbsent(topicPartition.topic(), topicId);

Review comment:
       Does this work with topic recreation? Will a client be stuck with the old topicId when topic is recreated?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -945,10 +945,12 @@ private boolean hasValidClusterId(String requestClusterId) {
             return completedFuture(new FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
         }
 
-        if (!hasValidTopicPartition(request, log.topicPartition())) {
+        if (!hasValidTopicPartition(request, log.topicPartition(), log.topicId())) {

Review comment:
       The KIP talks about bootstrapping the topicId for the metadata topic. Is that part done already? I don't see it included in this PR.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -283,14 +290,18 @@ public void onSuccess(ClientResponse resp) {
                                         fetchTarget.id());
                                 return;
                             }
-                            if (!handler.handleResponse(response)) {
+                            if (!handler.handleResponse(response, maxVersion)) {

Review comment:
       maxVersion is not necessary the exact version used for Fetch request. The exact version is determined in NetworkClient.doSend() based on the response of ApiVersions. So, here, it seems that we need to pass in the exact version number?

##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -261,6 +267,21 @@ class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {
     }.toSet
   }
 
+  def topicNamesToIds(): util.Map[String, Uuid] = {
+    metadataSnapshot.topicIds.asJava
+  }
+
+  def topicIdsToNames(): util.Map[Uuid, String] = {
+    metadataSnapshot.topicNames.asJava
+  }
+
+  /**
+   * This method returns a map from topic names to IDs and a map from topic IDs to names
+   */
+  def topicIdInfo(): (util.Map[String, Uuid], util.Map[Uuid, String]) = {
+    (topicNamesToIds(), topicIdsToNames())

Review comment:
       Since metadataSnapshot could change anytime, it's more consistent if we save a copy of metadataSnapshot and derive both maps from the same cached value.

##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -255,19 +390,41 @@ public FetchRequestData build() {
                 sessionPartitions.put(topicPartition, nextData);
                 added.add(topicPartition);
             }
+
+            // Check session topic IDs are consistent and add new IDs to the session.
+            boolean inconsistentTopicIds = processTopicIds();
+
+            // Close the session if we were missing topic IDs or had inconsistent topic IDs.
+            boolean closeSession = missingTopicIds || inconsistentTopicIds;
+
+            if (closeSession) {
+                nextMetadata = nextMetadata.nextCloseExisting();
+            }
+
             if (log.isDebugEnabled()) {
-                log.debug("Built incremental fetch {} for node {}. Added {}, altered {}, removed {} " +
-                          "out of {}", nextMetadata, node, partitionsToLogString(added),
-                          partitionsToLogString(altered), partitionsToLogString(removed),
-                          partitionsToLogString(sessionPartitions.keySet()));
+                if (closeSession) {
+                    log.debug("Built incremental fetch {} for node {}. Found inconsistent topic ID usage, so sending a " +
+                            "request to close the session", nextMetadata, node);

Review comment:
       It might be useful to include the topic name and topic id (old and new) for those inconsistent topic IDs.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -319,12 +340,27 @@ public int maxBytes() {
         return data.maxBytes();
     }
 
-    public Map<TopicPartition, PartitionData> fetchData() {
-        return fetchData;
+    // For versions 13+, throws UnknownTopicIdException if the topic ID was unknown to the server.
+    public Map<TopicPartition, PartitionData> fetchData(Map<Uuid, String> topicNames) throws UnknownTopicIdException {
+        return toPartitionDataMap(data.topics(), topicNames);

Review comment:
       Since toPartitionDataMap() is only called here, should we just inline it here?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -54,6 +57,8 @@
  *     not supported by the fetch request version
  * - {@link Errors#CORRUPT_MESSAGE} If corrupt message encountered, e.g. when the broker scans the log to find
  *     the fetch offset after the index lookup
+ * - {@link Errors#UNKNOWN_TOPIC_ID} If the request contains a topic ID unknown to the broker or a partition in the session has

Review comment:
       Should we document FETCH_SESSION_TOPIC_ID_ERROR too?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -71,6 +76,12 @@ public FetchResponseData data() {
         return data;
     }
 
+    /**
+     * From version 3 or later, the authorized and existing entries in `FetchRequest.fetchData` should be in the same order in `responseData`.
+     * Version 13 introduces topic IDs which mean there may be unresolved partitions. If there is any unknown topic ID in the request, the
+     * response will contain a top-level UNKNOWN_TOPIC_ID error and UNKNOWN_TOPIC_ID errors on all the partitions.
+     * We may also return UNKNOWN_TOPIC_ID for a given partition when that partition in the session has a topic ID inconsistent with the broker.

Review comment:
       Should UNKNOWN_TOPIC_ID be FETCH_SESSION_TOPIC_ID_ERROR now?

##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -165,7 +233,21 @@ public String toString() {
                         prefix = ", ";
                     }
                 }
-                bld.append("))");
+                bld.append("), topicIds=(");
+                prefix = "";
+                for (Map.Entry<String, Uuid> entry : topicIds.entrySet()) {
+                    bld.append(prefix);
+                    bld.append(entry.getKey());
+                    bld.append(": ");
+                    bld.append(entry.getValue());
+                    prefix = ", ";
+                }
+                if (canUseTopicIds) {
+                    bld.append("), canUseTopicIds=True");
+                } else {
+                    bld.append("), canUseTopicIds=False");
+                }

Review comment:
       Could we do the topicIds part once after the if/else block to avoid duplication?

##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -73,6 +77,22 @@ public FetchSessionHandler(LogContext logContext, int node) {
     private LinkedHashMap<TopicPartition, PartitionData> sessionPartitions =
         new LinkedHashMap<>(0);
 
+    /**
+     * All of the topic ids mapped to topic names for topics which exist in the fetch request session.
+     */
+    private Map<String, Uuid> sessionTopicIds = new HashMap<>(0);
+
+    /**
+     * All of the topic names mapped to topic ids for topics which exist in the fetch request session.
+     */
+    private Map<Uuid, String> sessionTopicNames = new HashMap<>(0);
+
+    public Map<Uuid, String> sessionTopicNames() {
+        return sessionTopicNames;
+    }
+
+    private boolean canUseTopicIds = false;

Review comment:
       > After running through some tests I realized why this didn't work. We can go from version 13 to version 12 within the session, but we can't go from 12 to 13. This is because we have may have topics without IDs in the session. We will try to return them using version 13 and they are all zero UUID. (We also have this issue when we send a full request version 12 and the subsequent request is empty. We could try to send version 13 request since we vacuously have IDs for all topics in the request, but if we do have responses for the topics, then we will try to send them back without topic IDs) If we tried to resolve them, we may end up in a case where there is no valid ID and also no way to communicate this (since we send back IDs). So I think we do need to store the state of the previous request version in the session.
   
   I am wondering if this solves the problem completely. The decision to use version 13 fetch request also depends on the Kafka version on the broker. So, even if the client has all topic ids, the client may still send version 12 fetch requests to a broker. So, canUseTopicIds doesn't accurate capture the state whether a version 13 fetch request has been used.
   
   Another possibility is to handle the switching from version 12 to 13 of the fetch requests on the server side in FetchSession. FetchSession already stores usesTopicIds. So, if usesTopicIds is false and a fetch request passes in topicId, we could send an error to the client to force the client to establish a new session. If we do this, we probably don't need to cache the canUseTopicIds in client fetch session. We can just calculated canUseTopicIds independently for each request. Will this approach be better?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -119,18 +120,50 @@ public static FetchResponse parse(ByteBuffer buffer, short version) {
         return new FetchResponse(new FetchResponseData(new ByteBufferAccessor(buffer), version));
     }
 
+    private LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> toResponseDataMap(Map<Uuid, String> topicIdToNameMap, short version) {

Review comment:
       We choose to cache responseData here but not in FetchRequest. Is there a particular reason for this inconsistency?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -283,14 +290,18 @@ public void onSuccess(ClientResponse resp) {
                                         fetchTarget.id());
                                 return;
                             }
-                            if (!handler.handleResponse(response)) {
+                            if (!handler.handleResponse(response, maxVersion)) {
+                                if (response.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR || response.error() == Errors.UNKNOWN_TOPIC_ID) {
+                                    metadata.requestUpdate();

Review comment:
       Should we force close the FetchSession in this case too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org