You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/17 18:47:22 UTC

[GitHub] [kafka] chia7712 commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

chia7712 commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r671730072



##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -186,36 +250,65 @@ public String toString() {
          * incremental fetch requests (see below).
          */
         private LinkedHashMap<TopicPartition, PartitionData> next;
+        private Map<String, Uuid> topicIds;
         private final boolean copySessionPartitions;
+        private int partitionsWithoutTopicIds = 0;
 
         Builder() {
             this.next = new LinkedHashMap<>();
+            this.topicIds = new HashMap<>();
             this.copySessionPartitions = true;
         }
 
         Builder(int initialSize, boolean copySessionPartitions) {
             this.next = new LinkedHashMap<>(initialSize);
+            this.topicIds = new HashMap<>(initialSize);
             this.copySessionPartitions = copySessionPartitions;
         }
 
         /**
          * Mark that we want data from this partition in the upcoming fetch.
          */
-        public void add(TopicPartition topicPartition, PartitionData data) {
+        public void add(TopicPartition topicPartition, Uuid topicId, PartitionData data) {
             next.put(topicPartition, data);
+            // topicIds should not change between adding partitions and building, so we can use putIfAbsent
+            if (!topicId.equals(Uuid.ZERO_UUID)) {
+                topicIds.putIfAbsent(topicPartition.topic(), topicId);
+            } else {
+                partitionsWithoutTopicIds++;
+            }
         }
 
         public FetchRequestData build() {
+            // For incremental sessions we don't have to worry about removed partitions when using topic IDs because we will either
+            //   a) already be using topic IDs and have the ID in the session
+            //   b) not be using topic IDs before and will close the session upon trying to use them

Review comment:
       If there is a removing partition, the topic id is NOT added to this builder. The fetch request with version=13 will carry `topic='xxx'` and `id=AAA...`. However, the topic name get reset to empty string by Kafka protocol. Hence, the following error message is produced.
   
   ```
   Unexpected error handling request RequestHeader(apiKey=FETCH, apiVersion=13, clientId=broker-2-fetcher-0, correlationId=1) -- FetchRequestData(clusterId=null, replicaId=2, maxWaitMs=500, minBytes=1, maxBytes=10485760, isolationLevel=0, sessionId=620866590, sessionEpoch=1, topics=[], forgottenTopicsData=[ForgottenTopic(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAA, partitions=[3, 0, 6, 9])], rackId='') with context RequestContext(header=RequestHeader(apiKey=FETCH, apiVersion=13, clientId=broker-2-fetcher-0, correlationId=1), connectionId='127.0.0.1:36157-127.0.0.1:45350-0', clientAddress=/127.0.0.1, principal=User:ANONYMOUS, listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=unknown, softwareVersion=unknown), fromPrivilegedListener=true, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@56b8fb84]) (kafka.server.KafkaApis:76)
   org.apache.kafka.common.errors.UnknownTopicIdException: Topic Id AAAAAAAAAAAAAAAAAAAAAA in FetchRequest was unknown to the server
   ```
   
   If this is an expected behavior, should we change the log level from `ERROR` to `DEBUG`? or add more docs to say `This error may be returned transiently when xxx`? 
   

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -758,16 +814,25 @@ class FetchManager(private val time: Time,
           case Some(session) => session.synchronized {
             if (session.epoch != reqMetadata.epoch) {
               debug(s"Session error for ${reqMetadata.sessionId}: expected epoch " +
-                s"${session.epoch}, but got ${reqMetadata.epoch} instead.");
+                s"${session.epoch}, but got ${reqMetadata.epoch} instead.")
               new SessionErrorContext(Errors.INVALID_FETCH_SESSION_EPOCH, reqMetadata)
+            } else if (session.usesTopicIds && reqVersion < 13 || !session.usesTopicIds && reqVersion >= 13)  {

Review comment:
       I noticed following warning message from our cluster (building on trunk). 
   
   ```
   WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=0, ....(kafka.server.ReplicaFetcherThread:72)
   org.apache.kafka.common.errors.FetchSessionTopicIdException: The fetch session encountered inconsistent topic ID usage
   ```
   
   According to this code, changing the version in session is disallowed (please correct me if I misunderstood). Should fetch thread keep version in session meta? Or change the log level to `DEBUG`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org