You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/11 01:59:24 UTC

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

jeffkbkim commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1066438547


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -116,4 +118,80 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
     public boolean shouldClientThrottle(short version) {
         return version >= 4;
     }
+
+    public static class Builder {
+        OffsetCommitResponseData data = new OffsetCommitResponseData();
+        HashMap<String, OffsetCommitResponseTopic> byTopicName = new HashMap<>();
+
+        private OffsetCommitResponseTopic getOrCreateTopic(
+            String topicName
+        ) {
+            OffsetCommitResponseTopic topic = byTopicName.get(topicName);
+            if (topic == null) {
+                topic = new OffsetCommitResponseTopic().setName(topicName);
+                data.topics().add(topic);
+                byTopicName.put(topicName, topic);
+            }
+            return topic;
+        }
+
+        public Builder addPartition(
+            String topicName,
+            int partitionIndex,
+            Errors error
+        ) {
+            final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);
+
+            topicResponse.partitions().add(new OffsetCommitResponsePartition()
+                .setPartitionIndex(partitionIndex)
+                .setErrorCode(error.code()));
+
+            return this;
+        }
+
+        public <P> Builder addPartitions(
+            String topicName,
+            List<P> partitions,
+            Function<P, Integer> partitionIndex,
+            Errors error
+        ) {
+            final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);
+
+            partitions.forEach(partition -> {
+                topicResponse.partitions().add(new OffsetCommitResponsePartition()
+                    .setPartitionIndex(partitionIndex.apply(partition))
+                    .setErrorCode(error.code()));
+            });
+
+            return this;
+        }
+
+        public Builder merge(
+            OffsetCommitResponseData newData
+        ) {
+            if (data.topics().isEmpty()) {
+                // If the current data is empty, we can discard it and use the new data.
+                data = newData;
+            } else {
+                // Otherwise, we have to merge them together.
+                newData.topics().forEach(newTopic -> {
+                    OffsetCommitResponseTopic existingTopic = byTopicName.get(newTopic.name());
+                    if (existingTopic == null) {
+                        // If no topic exists, we can directly copy the new topic data.
+                        data.topics().add(newTopic);
+                        byTopicName.put(newTopic.name(), newTopic);
+                    } else {
+                        // Otherwise, we add the partitions to the existing one.
+                        existingTopic.partitions().addAll(newTopic.partitions());

Review Comment:
   Q: from the code it seems that `existingTopic` can only include partitions that failed in some way. we are assuming that there will be no overlap between existing partitions and `newTopic` partitions. should we add a check?



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -116,4 +118,80 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
     public boolean shouldClientThrottle(short version) {
         return version >= 4;
     }
+
+    public static class Builder {
+        OffsetCommitResponseData data = new OffsetCommitResponseData();
+        HashMap<String, OffsetCommitResponseTopic> byTopicName = new HashMap<>();
+
+        private OffsetCommitResponseTopic getOrCreateTopic(

Review Comment:
   nit: getOrAddTopic makes more sense to me



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -411,137 +411,144 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle an offset commit request
    */
-  def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val header = request.header
+  def handleOffsetCommitRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-    val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-    // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = {
-      val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
-      if (isDebugEnabled)
-        combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-          if (error != Errors.NONE) {
-            debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-              s"on partition $topicPartition failed due to ${error.exceptionName}")
-          }
-        }
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
-    }
-
-      // reject the request if not authorized to the group
+    // Reject the request if not authorized to the group
     if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) {
-      val error = Errors.GROUP_AUTHORIZATION_FAILED
-      val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-        offsetCommitRequest.data.topics,
-        error)
-
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(
-        new OffsetCommitResponseData()
-            .setTopics(responseTopicList)
-            .setThrottleTimeMs(requestThrottleMs)
-      ))
+      requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
-      val errorMap = new mutable.HashMap[TopicPartition, Errors]
-      for (topicData <- offsetCommitRequest.data.topics.asScala) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
-        }
-      }
-      sendResponseCallback(errorMap.toMap)
+      requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else {
-      val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        offsetCommitRequest.data.topics.asScala
+      )(_.name)
+
+      val responseBuilder = new OffsetCommitResponse.Builder()
+      val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+      offsetCommitRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          val topicWithValidPartitions = new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name)
 
-      val topics = offsetCommitRequest.data.topics.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
-      for (topicData <- topics) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          if (!authorizedTopics.contains(topicData.name))
-            unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
-          else if (!metadataCache.contains(topicPartition))
-            nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
-          else
-            authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+          topic.partitions.forEach { partition =>
+            if (metadataCache.getPartitionInfo(topic.name, partition.partitionIndex).nonEmpty) {
+              topicWithValidPartitions.partitions.add(partition)
+            } else {
+              responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+            }
+          }
+
+          if (!topicWithValidPartitions.partitions.isEmpty) {
+            authorizedTopicsRequest += topicWithValidPartitions
+          }
         }
       }
 
-      val authorizedTopicRequestInfo = authorizedTopicRequestInfoBldr.result()
-
-      if (authorizedTopicRequestInfo.isEmpty)
-        sendResponseCallback(Map.empty)
-      else if (header.apiVersion == 0) {
-        // for version 0 always store offsets to ZK
-        val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset commit requests"))
-        val responseInfo = authorizedTopicRequestInfo.map {
-          case (topicPartition, partitionData) =>
-            try {
-              if (partitionData.committedMetadata() != null
-                && partitionData.committedMetadata().length > config.offsetMetadataMaxSize)
-                (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
-              else {
-                zkSupport.zkClient.setOrCreateConsumerOffset(
-                  offsetCommitRequest.data.groupId,
-                  topicPartition,
-                  partitionData.committedOffset)
-                (topicPartition, Errors.NONE)
-              }
-            } catch {
-              case e: Throwable => (topicPartition, Errors.forException(e))
-            }
-        }
-        sendResponseCallback(responseInfo)
+      if (authorizedTopicsRequest.isEmpty) {
+        requestHelper.sendMaybeThrottle(request, responseBuilder.build())
+        CompletableFuture.completedFuture(())
+      } else if (request.header.apiVersion == 0) {
+        // For version 0, always store offsets to ZK.
+        commitOffsetsToZookeeper(
+          request,
+          offsetCommitRequest,
+          authorizedTopicsRequest,
+          responseBuilder
+        )
       } else {
-        // for version 1 and beyond store offsets in offset manager
-
-        // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
-        // expire timestamp is computed differently for v1 and v2.
-        //   - If v1 and no explicit commit timestamp is provided we treat it the same as v5.
-        //   - If v1 and explicit retention time is provided we calculate expiration timestamp based on that
-        //   - If v2/v3/v4 (no explicit commit timestamp) we treat it the same as v5.
-        //   - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect
-        val currentTimestamp = time.milliseconds
-        val partitionData = authorizedTopicRequestInfo.map { case (k, partitionData) =>
-          val metadata = if (partitionData.committedMetadata == null)
-            OffsetAndMetadata.NoMetadata
-          else
-            partitionData.committedMetadata
+        // For version > 0, store offsets to Coordinator.

Review Comment:
   nit: offsets in



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -411,137 +411,144 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle an offset commit request
    */
-  def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val header = request.header
+  def handleOffsetCommitRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-    val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-    // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = {
-      val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
-      if (isDebugEnabled)
-        combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-          if (error != Errors.NONE) {
-            debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-              s"on partition $topicPartition failed due to ${error.exceptionName}")
-          }
-        }
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
-    }
-
-      // reject the request if not authorized to the group
+    // Reject the request if not authorized to the group
     if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) {
-      val error = Errors.GROUP_AUTHORIZATION_FAILED
-      val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-        offsetCommitRequest.data.topics,
-        error)
-
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(
-        new OffsetCommitResponseData()
-            .setTopics(responseTopicList)
-            .setThrottleTimeMs(requestThrottleMs)
-      ))
+      requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
-      val errorMap = new mutable.HashMap[TopicPartition, Errors]
-      for (topicData <- offsetCommitRequest.data.topics.asScala) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
-        }
-      }
-      sendResponseCallback(errorMap.toMap)
+      requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else {
-      val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        offsetCommitRequest.data.topics.asScala
+      )(_.name)
+
+      val responseBuilder = new OffsetCommitResponse.Builder()
+      val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+      offsetCommitRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          val topicWithValidPartitions = new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name)
 
-      val topics = offsetCommitRequest.data.topics.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
-      for (topicData <- topics) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          if (!authorizedTopics.contains(topicData.name))
-            unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
-          else if (!metadataCache.contains(topicPartition))
-            nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
-          else
-            authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+          topic.partitions.forEach { partition =>
+            if (metadataCache.getPartitionInfo(topic.name, partition.partitionIndex).nonEmpty) {
+              topicWithValidPartitions.partitions.add(partition)
+            } else {
+              responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+            }
+          }
+
+          if (!topicWithValidPartitions.partitions.isEmpty) {
+            authorizedTopicsRequest += topicWithValidPartitions
+          }
         }
       }
 
-      val authorizedTopicRequestInfo = authorizedTopicRequestInfoBldr.result()
-
-      if (authorizedTopicRequestInfo.isEmpty)
-        sendResponseCallback(Map.empty)
-      else if (header.apiVersion == 0) {
-        // for version 0 always store offsets to ZK
-        val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset commit requests"))
-        val responseInfo = authorizedTopicRequestInfo.map {
-          case (topicPartition, partitionData) =>
-            try {
-              if (partitionData.committedMetadata() != null
-                && partitionData.committedMetadata().length > config.offsetMetadataMaxSize)
-                (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
-              else {
-                zkSupport.zkClient.setOrCreateConsumerOffset(
-                  offsetCommitRequest.data.groupId,
-                  topicPartition,
-                  partitionData.committedOffset)
-                (topicPartition, Errors.NONE)
-              }
-            } catch {
-              case e: Throwable => (topicPartition, Errors.forException(e))
-            }
-        }
-        sendResponseCallback(responseInfo)
+      if (authorizedTopicsRequest.isEmpty) {
+        requestHelper.sendMaybeThrottle(request, responseBuilder.build())
+        CompletableFuture.completedFuture(())
+      } else if (request.header.apiVersion == 0) {
+        // For version 0, always store offsets to ZK.

Review Comment:
   nit: offsets in



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -436,4 +440,74 @@ class GroupCoordinatorAdapterTest {
 
     assertEquals(expectedResults, future.get())
   }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+  def testCommitOffsets(version: Short): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val time = new MockTime()
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, time)
+    val now = time.milliseconds()
+
+    val ctx = makeContext(ApiKeys.OFFSET_COMMIT, version)
+    val data = new OffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setGenerationId(10)
+      .setRetentionTimeMs(1000)
+      .setTopics(List(
+        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+          .setName("foo")
+          .setPartitions(List(
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(100)
+              .setCommitTimestamp(now)

Review Comment:
   can we test values less than -1?



##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -234,4 +240,79 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+    context: RequestContext,

Review Comment:
   i noticed this isn't used here as well as for the other coordinator APIs (other than joinGroup). what's the reason for having this parameter? are we expecting to use this in the new group coordinator?



##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -234,4 +240,79 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+    context: RequestContext,
+    request: OffsetCommitRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+    val currentTimeMs = time.milliseconds
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+
+    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+      val response = new OffsetCommitResponseData()
+      val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      commitStatus.forKeyValue { (tp, error) =>
+        val topic = byTopics.get(tp.topic) match {
+          case Some(existingTopic) =>
+            existingTopic
+          case None =>
+            val newTopic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+            byTopics += tp.topic -> newTopic
+            response.topics.add(newTopic)
+            newTopic
+        }
+
+        topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(tp.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    // "default" expiration timestamp is defined as now + retention. The retention may be overridden
+    // in versions from v2 to v4. Otherwise, the retention defined on the broker is used. If an explicit
+    // commit timestamp is provided (v1 only), the expiration timestamp is computed based on that.
+    val expireTimeMs = request.retentionTimeMs match {
+      case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
+      case retentionTimeMs => Some(currentTimeMs + retentionTimeMs)
+    }
+
+    val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        val tp = new TopicPartition(topic.name, partition.partitionIndex)
+        partitions += tp -> new OffsetAndMetadata(
+          offset = partition.committedOffset,
+          leaderEpoch = partition.committedLeaderEpoch match {
+            case RecordBatch.NO_PARTITION_LEADER_EPOCH => Optional.empty[Integer]
+            case committedLeaderEpoch => Optional.of[Integer](committedLeaderEpoch)
+          },
+          metadata = partition.committedMetadata match {
+            case null => OffsetAndMetadata.NoMetadata
+            case metadata => metadata
+          },
+          commitTimestamp = partition.commitTimestamp match {
+            case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimeMs
+            case customTimestamp => customTimestamp
+          },

Review Comment:
   i wasn't able to find where we validate the commit timestamp. how do we handle timestamps that are less than -1? i am also curious about retention time ms.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -411,137 +411,144 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle an offset commit request
    */
-  def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val header = request.header
+  def handleOffsetCommitRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-    val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-    // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = {
-      val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
-      if (isDebugEnabled)
-        combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-          if (error != Errors.NONE) {
-            debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-              s"on partition $topicPartition failed due to ${error.exceptionName}")
-          }

Review Comment:
   i think we lost this debug log, can we add it back? at least when the future from newGroupCoordinator.commitOffsets completes exceptionally. and consider adding it for TOPIC_AUTHORIZATION_FAILED and UNKNOWN_TOPIC_OR_PARTITION errors.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -436,4 +440,74 @@ class GroupCoordinatorAdapterTest {
 
     assertEquals(expectedResults, future.get())
   }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+  def testCommitOffsets(version: Short): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val time = new MockTime()
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, time)
+    val now = time.milliseconds()
+
+    val ctx = makeContext(ApiKeys.OFFSET_COMMIT, version)
+    val data = new OffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setGenerationId(10)
+      .setRetentionTimeMs(1000)

Review Comment:
   can we test values less than -1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org