You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/21 21:03:45 UTC

[GitHub] [kafka] dajac opened a new pull request, #12886: [WIP] KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

dajac opened a new pull request, #12886:
URL: https://github.com/apache/kafka/pull/12886

   WIP
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12886:
URL: https://github.com/apache/kafka/pull/12886#issuecomment-1362010257

   @jolshan @OmniaGM Thanks for your comments. I just updated the PR. I have tried to simplify the code in KafkaApis as much as possible. Let me know what you think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
OmniaGM commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1030626743


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.common.OffsetAndMetadata
+import kafka.server.RequestLocal
+import kafka.utils.Implicits.MapExtensionMethods
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData, OffsetCommitRequestData, OffsetCommitResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests.OffsetCommitRequest
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.coordinator.group.GroupCoordinatorRequestContext
+
+import java.util
+import java.util.Optional
+import java.util.concurrent.CompletableFuture
+import scala.collection.{Map, mutable}
+import scala.jdk.CollectionConverters._
+
+/**
+ * GroupCoordinatorAdapter is a thin wrapper around kafka.coordinator.group.GroupCoordinator
+ * that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator interface.
+ */
+class GroupCoordinatorAdapter(
+  private val coordinator: GroupCoordinator,
+  private val time: Time
+) extends org.apache.kafka.coordinator.group.GroupCoordinator {
+
+  override def joinGroup(
+    context: GroupCoordinatorRequestContext,
+    request: JoinGroupRequestData
+  ): CompletableFuture[JoinGroupResponseData] = {
+    val future = new CompletableFuture[JoinGroupResponseData]()
+
+    def callback(joinResult: JoinGroupResult): Unit = {
+      future.complete(new JoinGroupResponseData()
+        .setErrorCode(joinResult.error.code)
+        .setGenerationId(joinResult.generationId)
+        .setProtocolType(joinResult.protocolType.orNull)
+        .setProtocolName(joinResult.protocolName.orNull)
+        .setLeader(joinResult.leaderId)
+        .setSkipAssignment(joinResult.skipAssignment)
+        .setMemberId(joinResult.memberId)
+        .setMembers(joinResult.members.asJava)
+      )
+    }
+
+    val groupInstanceId = Option(request.groupInstanceId)
+
+    // Only return MEMBER_ID_REQUIRED error if joinGroupRequest version is >= 4
+    // and groupInstanceId is configured to unknown.
+    val requireKnownMemberId = context.apiVersion >= 4 && groupInstanceId.isEmpty
+
+    val protocols = request.protocols.valuesList.asScala.map { protocol =>
+      (protocol.name, protocol.metadata)
+    }.toList
+
+    val supportSkippingAssignment = context.apiVersion >= 9
+
+    coordinator.handleJoinGroup(
+      request.groupId,
+      request.memberId,
+      groupInstanceId,
+      requireKnownMemberId,
+      supportSkippingAssignment,
+      context.clientId,
+      context.clientAddress.toString,
+      request.rebalanceTimeoutMs,
+      request.sessionTimeoutMs,
+      request.protocolType,
+      protocols,
+      callback,
+      Option(request.reason),
+      RequestLocal(context.bufferSupplier)
+    )
+
+    future
+  }
+
+  override def commitOffsets(
+    context: GroupCoordinatorRequestContext,
+    request: OffsetCommitRequestData
+  ): CompletableFuture[OffsetCommitResponseData] = {
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+
+    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+      val response = new OffsetCommitResponseData()
+      val byTopics = new util.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()

Review Comment:
   I noticed that `byTopic` use `java.util.HasMap` while `partitions` in Line:129 use scala `mutable.HasMap` any reason why we are using two different Map types? Can't we use here `mutable.HashMap` for both?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054814682


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -234,4 +240,78 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+    context: RequestContext,
+    request: OffsetCommitRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+
+    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+      val response = new OffsetCommitResponseData()
+      val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      commitStatus.forKeyValue { (tp, error) =>
+        var topic = byTopics(tp.topic)
+        if (topic == null) {
+          topic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+          byTopics += tp.topic -> topic
+          response.topics.add(topic)
+        }
+        topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(tp.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
+    // expire timestamp is computed differently for v1 and v2.
+    //   - If v1 and no explicit commit timestamp is provided we treat it the same as v5.
+    //   - If v1 and explicit retention time is provided we calculate expiration timestamp based on that

Review Comment:
   This comment is a little confusing. 
   So it seems like I understand v1 semantics -- we use the commit timestamp if provided for "now". 
   
   For v2 and beyond, I'm a big confused about the last two bullets. It makes it seem like there is no difference between v2-v4 and v5+, but I think the difference is that the retention can no longer be overridden in v5+. That part is unclear in the last bullet as it says "partition expiration" but "partition retention" is the field name.
   
   This is my understanding based on the code
   
   ```
   version:  (can define commit time aka "now"), (can define retention time)
      1                 yes                                no
      2                 no                                 yes
      3                 no                                 yes
      4                 no                                 yes
     5+                 no                                 no
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1067494981


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -116,4 +118,80 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
     public boolean shouldClientThrottle(short version) {
         return version >= 4;
     }
+
+    public static class Builder {
+        OffsetCommitResponseData data = new OffsetCommitResponseData();
+        HashMap<String, OffsetCommitResponseTopic> byTopicName = new HashMap<>();
+
+        private OffsetCommitResponseTopic getOrCreateTopic(
+            String topicName
+        ) {
+            OffsetCommitResponseTopic topic = byTopicName.get(topicName);
+            if (topic == null) {
+                topic = new OffsetCommitResponseTopic().setName(topicName);
+                data.topics().add(topic);
+                byTopicName.put(topicName, topic);
+            }
+            return topic;
+        }
+
+        public Builder addPartition(
+            String topicName,
+            int partitionIndex,
+            Errors error
+        ) {
+            final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);
+
+            topicResponse.partitions().add(new OffsetCommitResponsePartition()
+                .setPartitionIndex(partitionIndex)
+                .setErrorCode(error.code()));
+
+            return this;
+        }
+
+        public <P> Builder addPartitions(
+            String topicName,
+            List<P> partitions,
+            Function<P, Integer> partitionIndex,
+            Errors error
+        ) {
+            final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);
+
+            partitions.forEach(partition -> {
+                topicResponse.partitions().add(new OffsetCommitResponsePartition()
+                    .setPartitionIndex(partitionIndex.apply(partition))
+                    .setErrorCode(error.code()));
+            });
+
+            return this;
+        }
+
+        public Builder merge(
+            OffsetCommitResponseData newData
+        ) {
+            if (data.topics().isEmpty()) {
+                // If the current data is empty, we can discard it and use the new data.
+                data = newData;
+            } else {
+                // Otherwise, we have to merge them together.
+                newData.topics().forEach(newTopic -> {
+                    OffsetCommitResponseTopic existingTopic = byTopicName.get(newTopic.name());
+                    if (existingTopic == null) {
+                        // If no topic exists, we can directly copy the new topic data.
+                        data.topics().add(newTopic);
+                        byTopicName.put(newTopic.name(), newTopic);
+                    } else {
+                        // Otherwise, we add the partitions to the existing one.
+                        existingTopic.partitions().addAll(newTopic.partitions());

Review Comment:
   I think it is ok to keep as is, but maybe make a comment that we assume there are no overlapping partitions?
   
   As a side note, If there was overlap, we would just have two of the same partition in the response right? One with the error and one without?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054819922


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -411,137 +411,144 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle an offset commit request
    */
-  def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val header = request.header
+  def handleOffsetCommitRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-    val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-    // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = {
-      val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
-      if (isDebugEnabled)
-        combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-          if (error != Errors.NONE) {
-            debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-              s"on partition $topicPartition failed due to ${error.exceptionName}")
-          }
-        }
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
-    }
-
-      // reject the request if not authorized to the group
+    // Reject the request if not authorized to the group
     if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) {
-      val error = Errors.GROUP_AUTHORIZATION_FAILED
-      val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-        offsetCommitRequest.data.topics,
-        error)
-
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(
-        new OffsetCommitResponseData()
-            .setTopics(responseTopicList)
-            .setThrottleTimeMs(requestThrottleMs)
-      ))
+      requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
-      val errorMap = new mutable.HashMap[TopicPartition, Errors]
-      for (topicData <- offsetCommitRequest.data.topics.asScala) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
-        }
-      }
-      sendResponseCallback(errorMap.toMap)
+      requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else {
-      val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        offsetCommitRequest.data.topics.asScala
+      )(_.name)
+
+      val responseBuilder = new OffsetCommitResponse.Builder()
+      val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+      offsetCommitRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          val topicWithValidPartitions = new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name)
 
-      val topics = offsetCommitRequest.data.topics.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
-      for (topicData <- topics) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          if (!authorizedTopics.contains(topicData.name))
-            unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
-          else if (!metadataCache.contains(topicPartition))
-            nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
-          else
-            authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+          topic.partitions.forEach { partition =>
+            if (metadataCache.getPartitionInfo(topic.name, partition.partitionIndex).nonEmpty) {

Review Comment:
   more of a comment than something that needs changing -- it is interesting we check the metadata cache twice -- once to confirm topic existence and then again to confirm partition info existence. Probably not worth changing in this PR though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1065606143


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -234,4 +240,78 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+    context: RequestContext,
+    request: OffsetCommitRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+
+    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+      val response = new OffsetCommitResponseData()
+      val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      commitStatus.forKeyValue { (tp, error) =>
+        var topic = byTopics(tp.topic)
+        if (topic == null) {
+          topic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+          byTopics += tp.topic -> topic
+          response.topics.add(topic)
+        }
+        topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(tp.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
+    // expire timestamp is computed differently for v1 and v2.
+    //   - If v1 and no explicit commit timestamp is provided we treat it the same as v5.
+    //   - If v1 and explicit retention time is provided we calculate expiration timestamp based on that

Review Comment:
   I rewrote the comment. Let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12886:
URL: https://github.com/apache/kafka/pull/12886#issuecomment-1380727110

   Failed tests are not related.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1066989454


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -234,4 +240,79 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+    context: RequestContext,

Review Comment:
   I have put it anywhere for consistency. We may need it for other methods in the new group coordinator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12886:
URL: https://github.com/apache/kafka/pull/12886#issuecomment-1378782340

   @jeffkbkim @jolshan Updated and rebased the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1057149271


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -234,4 +240,78 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+    context: RequestContext,
+    request: OffsetCommitRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+
+    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+      val response = new OffsetCommitResponseData()
+      val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      commitStatus.forKeyValue { (tp, error) =>
+        var topic = byTopics(tp.topic)
+        if (topic == null) {
+          topic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+          byTopics += tp.topic -> topic
+          response.topics.add(topic)
+        }
+        topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(tp.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
+    // expire timestamp is computed differently for v1 and v2.
+    //   - If v1 and no explicit commit timestamp is provided we treat it the same as v5.
+    //   - If v1 and explicit retention time is provided we calculate expiration timestamp based on that

Review Comment:
   I think that the term "partition expiration" comes from `OffsetAndMetadata.expireTimestamp`. `expireTimestamp` is indeed derived from `RetentionTimeMs` which is no longer available from version 5.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1057809723


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -234,4 +240,78 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+    context: RequestContext,
+    request: OffsetCommitRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+
+    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+      val response = new OffsetCommitResponseData()
+      val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      commitStatus.forKeyValue { (tp, error) =>
+        var topic = byTopics(tp.topic)
+        if (topic == null) {
+          topic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+          byTopics += tp.topic -> topic
+          response.topics.add(topic)
+        }
+        topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(tp.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
+    // expire timestamp is computed differently for v1 and v2.
+    //   - If v1 and no explicit commit timestamp is provided we treat it the same as v5.
+    //   - If v1 and explicit retention time is provided we calculate expiration timestamp based on that

Review Comment:
   Hmm. I'm not sure we made this comment much clearer.
   
   I think the main flaws are that it says that we can only override retention time in v2 (matches the json spec) but the first two bullets mention "explicit retention time". I'm not really sure what that means.
   
   The second thing is enumerating the versions. I think it's just clearer to say that some versions have the option to explicitly set retention time. v5 and any version without it set ignores the expireTimestamp field.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054834094


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java:
##########
@@ -52,6 +54,7 @@ CompletableFuture<JoinGroupResponseData> joinGroup(
     );
 
     /**
+<<<<<<< HEAD

Review Comment:
   nit: remove



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1066988206


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -116,4 +118,80 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
     public boolean shouldClientThrottle(short version) {
         return version >= 4;
     }
+
+    public static class Builder {
+        OffsetCommitResponseData data = new OffsetCommitResponseData();
+        HashMap<String, OffsetCommitResponseTopic> byTopicName = new HashMap<>();
+
+        private OffsetCommitResponseTopic getOrCreateTopic(
+            String topicName
+        ) {
+            OffsetCommitResponseTopic topic = byTopicName.get(topicName);
+            if (topic == null) {
+                topic = new OffsetCommitResponseTopic().setName(topicName);
+                data.topics().add(topic);
+                byTopicName.put(topicName, topic);
+            }
+            return topic;
+        }
+
+        public Builder addPartition(
+            String topicName,
+            int partitionIndex,
+            Errors error
+        ) {
+            final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);
+
+            topicResponse.partitions().add(new OffsetCommitResponsePartition()
+                .setPartitionIndex(partitionIndex)
+                .setErrorCode(error.code()));
+
+            return this;
+        }
+
+        public <P> Builder addPartitions(
+            String topicName,
+            List<P> partitions,
+            Function<P, Integer> partitionIndex,
+            Errors error
+        ) {
+            final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);
+
+            partitions.forEach(partition -> {
+                topicResponse.partitions().add(new OffsetCommitResponsePartition()
+                    .setPartitionIndex(partitionIndex.apply(partition))
+                    .setErrorCode(error.code()));
+            });
+
+            return this;
+        }
+
+        public Builder merge(
+            OffsetCommitResponseData newData
+        ) {
+            if (data.topics().isEmpty()) {
+                // If the current data is empty, we can discard it and use the new data.
+                data = newData;
+            } else {
+                // Otherwise, we have to merge them together.
+                newData.topics().forEach(newTopic -> {
+                    OffsetCommitResponseTopic existingTopic = byTopicName.get(newTopic.name());
+                    if (existingTopic == null) {
+                        // If no topic exists, we can directly copy the new topic data.
+                        data.topics().add(newTopic);
+                        byTopicName.put(newTopic.name(), newTopic);
+                    } else {
+                        // Otherwise, we add the partitions to the existing one.
+                        existingTopic.partitions().addAll(newTopic.partitions());

Review Comment:
   That's right. I thought about adding a check but it is costly because the only way to check is to iterate over the existing partitions to check if the new one is there. Given that we know that partitions are not supposed to be duplicated by the user of this class, I thought that it was not necessary. What do you think?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054778967


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -407,136 +416,200 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Handle an offset commit request
    */
   def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val header = request.header
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-    val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-    // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = {
-      val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
-      if (isDebugEnabled)
-        combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-          if (error != Errors.NONE) {
-            debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-              s"on partition $topicPartition failed due to ${error.exceptionName}")
+    def sendResponse(response: OffsetCommitResponse): Unit = {
+      trace(s"Sending offset commit response $response for correlation id ${request.header.correlationId} to " +
+        s"client ${request.header.clientId}.")
+
+      if (isDebugEnabled) {
+        response.data.topics.forEach { topic =>
+          topic.partitions.forEach { partition =>
+            if (partition.errorCode != Errors.NONE.code) {
+              debug(s"Offset commit request with correlation id ${request.header.correlationId} from client ${request.header.clientId} " +
+                s"on partition ${topic.name}-${partition.partitionIndex} failed due to ${Errors.forCode(partition.errorCode)}")
+            }
           }
         }
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
+      }
+
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
 
-      // reject the request if not authorized to the group
+    // Reject the request if not authorized to the group
     if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) {
-      val error = Errors.GROUP_AUTHORIZATION_FAILED
-      val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-        offsetCommitRequest.data.topics,
-        error)
-
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(
-        new OffsetCommitResponseData()
-            .setTopics(responseTopicList)
-            .setThrottleTimeMs(requestThrottleMs)
-      ))
+      sendResponse(offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
     } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
-      val errorMap = new mutable.HashMap[TopicPartition, Errors]
-      for (topicData <- offsetCommitRequest.data.topics.asScala) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
+      sendResponse(offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+    } else {
+      val offsetCommitResponseData = new OffsetCommitResponseData()
+      val topicsPendingPartitions = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        offsetCommitRequest.data.topics.asScala
+      )(_.name)
+
+      def makePartitionResponse(
+        partitionIndex: Int,
+        error: Errors
+      ): OffsetCommitResponseData.OffsetCommitResponsePartition = {
+        new OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(partitionIndex)
+          .setErrorCode(error.code)
+      }
+
+      def addTopicToResponse(
+        topic: OffsetCommitRequestData.OffsetCommitRequestTopic,
+        error: Errors
+      ): Unit = {
+        val topicResponse = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(topic.name)
+        offsetCommitResponseData.topics.add(topicResponse)
+        topic.partitions.forEach { partition =>
+          topicResponse.partitions.add(makePartitionResponse(partition.partitionIndex, error))
         }
       }
-      sendResponseCallback(errorMap.toMap)
-    } else {
-      val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
 
-      val topics = offsetCommitRequest.data.topics.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
-      for (topicData <- topics) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          if (!authorizedTopics.contains(topicData.name))
-            unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
-          else if (!metadataCache.contains(topicPartition))
-            nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
-          else
-            authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+      val authorizedTopicsRequest = new util.ArrayList[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+      offsetCommitRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          addTopicToResponse(topic, Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          addTopicToResponse(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          var topicRequestWithValidPartitions: OffsetCommitRequestData.OffsetCommitRequestTopic = null

Review Comment:
   Yeah, we basically return partitions if they have committed offsets or an error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054833397


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -411,137 +411,144 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle an offset commit request
    */
-  def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val header = request.header
+  def handleOffsetCommitRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-    val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-    // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = {
-      val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
-      if (isDebugEnabled)
-        combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-          if (error != Errors.NONE) {
-            debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-              s"on partition $topicPartition failed due to ${error.exceptionName}")
-          }
-        }
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
-    }
-
-      // reject the request if not authorized to the group
+    // Reject the request if not authorized to the group
     if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) {
-      val error = Errors.GROUP_AUTHORIZATION_FAILED
-      val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-        offsetCommitRequest.data.topics,
-        error)
-
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(
-        new OffsetCommitResponseData()
-            .setTopics(responseTopicList)
-            .setThrottleTimeMs(requestThrottleMs)
-      ))
+      requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
-      val errorMap = new mutable.HashMap[TopicPartition, Errors]
-      for (topicData <- offsetCommitRequest.data.topics.asScala) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
-        }
-      }
-      sendResponseCallback(errorMap.toMap)
+      requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else {
-      val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        offsetCommitRequest.data.topics.asScala
+      )(_.name)
+
+      val responseBuilder = new OffsetCommitResponse.Builder()
+      val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+      offsetCommitRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          val topicWithValidPartitions = new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name)
 
-      val topics = offsetCommitRequest.data.topics.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
-      for (topicData <- topics) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          if (!authorizedTopics.contains(topicData.name))
-            unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
-          else if (!metadataCache.contains(topicPartition))
-            nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
-          else
-            authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+          topic.partitions.forEach { partition =>
+            if (metadataCache.getPartitionInfo(topic.name, partition.partitionIndex).nonEmpty) {

Review Comment:
   Yeah -- wondering if it would be an optimization to do the check once, but I guess it's not worth checking partitions if the topic itself doesn't exist 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054830892


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -234,4 +240,78 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+    context: RequestContext,
+    request: OffsetCommitRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+
+    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+      val response = new OffsetCommitResponseData()
+      val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      commitStatus.forKeyValue { (tp, error) =>
+        var topic = byTopics(tp.topic)
+        if (topic == null) {
+          topic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+          byTopics += tp.topic -> topic
+          response.topics.add(topic)
+        }
+        topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(tp.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
+    // expire timestamp is computed differently for v1 and v2.
+    //   - If v1 and no explicit commit timestamp is provided we treat it the same as v5.
+    //   - If v1 and explicit retention time is provided we calculate expiration timestamp based on that

Review Comment:
   I realize this comment was copy-pasted, but we can clean it up I think :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1068168361


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java:
##########
@@ -164,5 +166,20 @@ CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> fetch
         String groupId,
         boolean requireStable
     );
+
+    /**
+     * Commit offsets for a given Group.

Review Comment:
   Yes, that's right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12886:
URL: https://github.com/apache/kafka/pull/12886#issuecomment-1380487750

   @jeffkbkim @jolshan I updated the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jeffkbkim commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1067611094


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java:
##########
@@ -164,5 +166,20 @@ CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> fetch
         String groupId,
         boolean requireStable
     );
+
+    /**
+     * Commit offsets for a given Group.

Review Comment:
   are the descriptions for `fetchOffsets(), fetchAllOffsets(), commitOffsets()` "Group" instead of "Generic Group" since they can apply to both Generic and Consumer groups? just noticed the difference in join/sync/leave group.



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -116,4 +118,80 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
     public boolean shouldClientThrottle(short version) {
         return version >= 4;
     }
+
+    public static class Builder {
+        OffsetCommitResponseData data = new OffsetCommitResponseData();
+        HashMap<String, OffsetCommitResponseTopic> byTopicName = new HashMap<>();
+
+        private OffsetCommitResponseTopic getOrCreateTopic(
+            String topicName
+        ) {
+            OffsetCommitResponseTopic topic = byTopicName.get(topicName);
+            if (topic == null) {
+                topic = new OffsetCommitResponseTopic().setName(topicName);
+                data.topics().add(topic);
+                byTopicName.put(topicName, topic);
+            }
+            return topic;
+        }
+
+        public Builder addPartition(
+            String topicName,
+            int partitionIndex,
+            Errors error
+        ) {
+            final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);
+
+            topicResponse.partitions().add(new OffsetCommitResponsePartition()
+                .setPartitionIndex(partitionIndex)
+                .setErrorCode(error.code()));
+
+            return this;
+        }
+
+        public <P> Builder addPartitions(
+            String topicName,
+            List<P> partitions,
+            Function<P, Integer> partitionIndex,
+            Errors error
+        ) {
+            final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);
+
+            partitions.forEach(partition -> {
+                topicResponse.partitions().add(new OffsetCommitResponsePartition()
+                    .setPartitionIndex(partitionIndex.apply(partition))
+                    .setErrorCode(error.code()));
+            });
+
+            return this;
+        }
+
+        public Builder merge(
+            OffsetCommitResponseData newData
+        ) {
+            if (data.topics().isEmpty()) {
+                // If the current data is empty, we can discard it and use the new data.
+                data = newData;
+            } else {
+                // Otherwise, we have to merge them together.
+                newData.topics().forEach(newTopic -> {
+                    OffsetCommitResponseTopic existingTopic = byTopicName.get(newTopic.name());
+                    if (existingTopic == null) {
+                        // If no topic exists, we can directly copy the new topic data.
+                        data.topics().add(newTopic);
+                        byTopicName.put(newTopic.name(), newTopic);
+                    } else {
+                        // Otherwise, we add the partitions to the existing one.
+                        existingTopic.partitions().addAll(newTopic.partitions());

Review Comment:
   that makes sense. @jolshan that seems to be the case, though since a failed partition is added first the non-error state may overwrite the error when the consumer parses the response.
   
   also a +1 on leaving a small note that the code assumes no overlap.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054822770


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -411,137 +411,144 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle an offset commit request
    */
-  def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val header = request.header
+  def handleOffsetCommitRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-    val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-    // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = {
-      val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
-      if (isDebugEnabled)
-        combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-          if (error != Errors.NONE) {
-            debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-              s"on partition $topicPartition failed due to ${error.exceptionName}")
-          }
-        }
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
-    }
-
-      // reject the request if not authorized to the group
+    // Reject the request if not authorized to the group
     if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) {
-      val error = Errors.GROUP_AUTHORIZATION_FAILED
-      val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-        offsetCommitRequest.data.topics,
-        error)
-
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(
-        new OffsetCommitResponseData()
-            .setTopics(responseTopicList)
-            .setThrottleTimeMs(requestThrottleMs)
-      ))
+      requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
-      val errorMap = new mutable.HashMap[TopicPartition, Errors]
-      for (topicData <- offsetCommitRequest.data.topics.asScala) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
-        }
-      }
-      sendResponseCallback(errorMap.toMap)
+      requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else {
-      val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        offsetCommitRequest.data.topics.asScala
+      )(_.name)
+
+      val responseBuilder = new OffsetCommitResponse.Builder()
+      val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+      offsetCommitRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          val topicWithValidPartitions = new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name)
 
-      val topics = offsetCommitRequest.data.topics.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
-      for (topicData <- topics) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          if (!authorizedTopics.contains(topicData.name))
-            unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
-          else if (!metadataCache.contains(topicPartition))
-            nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
-          else
-            authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+          topic.partitions.forEach { partition =>
+            if (metadataCache.getPartitionInfo(topic.name, partition.partitionIndex).nonEmpty) {

Review Comment:
   Yeah, I did this on purpose. If the topic does not exist, it does not make sense to verify all the partitions of that topic. Also, in the future, we will also add a check for the topic id somewhere here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054648954


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -407,136 +416,200 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Handle an offset commit request
    */
   def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val header = request.header
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-    val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-    // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = {
-      val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
-      if (isDebugEnabled)
-        combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-          if (error != Errors.NONE) {
-            debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-              s"on partition $topicPartition failed due to ${error.exceptionName}")
+    def sendResponse(response: OffsetCommitResponse): Unit = {
+      trace(s"Sending offset commit response $response for correlation id ${request.header.correlationId} to " +
+        s"client ${request.header.clientId}.")
+
+      if (isDebugEnabled) {
+        response.data.topics.forEach { topic =>
+          topic.partitions.forEach { partition =>
+            if (partition.errorCode != Errors.NONE.code) {
+              debug(s"Offset commit request with correlation id ${request.header.correlationId} from client ${request.header.clientId} " +
+                s"on partition ${topic.name}-${partition.partitionIndex} failed due to ${Errors.forCode(partition.errorCode)}")
+            }
           }
         }
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
+      }
+
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
 
-      // reject the request if not authorized to the group
+    // Reject the request if not authorized to the group
     if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) {
-      val error = Errors.GROUP_AUTHORIZATION_FAILED
-      val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-        offsetCommitRequest.data.topics,
-        error)
-
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(
-        new OffsetCommitResponseData()
-            .setTopics(responseTopicList)
-            .setThrottleTimeMs(requestThrottleMs)
-      ))
+      sendResponse(offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
     } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
-      val errorMap = new mutable.HashMap[TopicPartition, Errors]
-      for (topicData <- offsetCommitRequest.data.topics.asScala) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
+      sendResponse(offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+    } else {
+      val offsetCommitResponseData = new OffsetCommitResponseData()
+      val topicsPendingPartitions = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        offsetCommitRequest.data.topics.asScala
+      )(_.name)
+
+      def makePartitionResponse(
+        partitionIndex: Int,
+        error: Errors
+      ): OffsetCommitResponseData.OffsetCommitResponsePartition = {
+        new OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(partitionIndex)
+          .setErrorCode(error.code)
+      }
+
+      def addTopicToResponse(
+        topic: OffsetCommitRequestData.OffsetCommitRequestTopic,
+        error: Errors
+      ): Unit = {
+        val topicResponse = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(topic.name)
+        offsetCommitResponseData.topics.add(topicResponse)
+        topic.partitions.forEach { partition =>
+          topicResponse.partitions.add(makePartitionResponse(partition.partitionIndex, error))
         }
       }
-      sendResponseCallback(errorMap.toMap)
-    } else {
-      val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
 
-      val topics = offsetCommitRequest.data.topics.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
-      for (topicData <- topics) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          if (!authorizedTopics.contains(topicData.name))
-            unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
-          else if (!metadataCache.contains(topicPartition))
-            nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
-          else
-            authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+      val authorizedTopicsRequest = new util.ArrayList[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+      offsetCommitRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          addTopicToResponse(topic, Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          addTopicToResponse(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          var topicRequestWithValidPartitions: OffsetCommitRequestData.OffsetCommitRequestTopic = null

Review Comment:
   Ah interesting. For offsetfetch, do we basically try to fetch and if its missing we return the error there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1066999003


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -234,4 +240,79 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+    context: RequestContext,
+    request: OffsetCommitRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+    val currentTimeMs = time.milliseconds
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+
+    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+      val response = new OffsetCommitResponseData()
+      val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      commitStatus.forKeyValue { (tp, error) =>
+        val topic = byTopics.get(tp.topic) match {
+          case Some(existingTopic) =>
+            existingTopic
+          case None =>
+            val newTopic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+            byTopics += tp.topic -> newTopic
+            response.topics.add(newTopic)
+            newTopic
+        }
+
+        topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(tp.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    // "default" expiration timestamp is defined as now + retention. The retention may be overridden
+    // in versions from v2 to v4. Otherwise, the retention defined on the broker is used. If an explicit
+    // commit timestamp is provided (v1 only), the expiration timestamp is computed based on that.
+    val expireTimeMs = request.retentionTimeMs match {
+      case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
+      case retentionTimeMs => Some(currentTimeMs + retentionTimeMs)
+    }
+
+    val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        val tp = new TopicPartition(topic.name, partition.partitionIndex)
+        partitions += tp -> new OffsetAndMetadata(
+          offset = partition.committedOffset,
+          leaderEpoch = partition.committedLeaderEpoch match {
+            case RecordBatch.NO_PARTITION_LEADER_EPOCH => Optional.empty[Integer]
+            case committedLeaderEpoch => Optional.of[Integer](committedLeaderEpoch)
+          },
+          metadata = partition.committedMetadata match {
+            case null => OffsetAndMetadata.NoMetadata
+            case metadata => metadata
+          },
+          commitTimestamp = partition.commitTimestamp match {
+            case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimeMs
+            case customTimestamp => customTimestamp
+          },

Review Comment:
   It seems that they are not validated anywhere. We basically store whatever we get. As a result, if the provided retention or the commit timestamp are negative, the offset will be expired immediately. This is inline with the behavior prior to this patch. We could improve it (if we want) separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1067000027


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -411,137 +411,144 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle an offset commit request
    */
-  def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val header = request.header
+  def handleOffsetCommitRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-    val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-    // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = {
-      val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
-      if (isDebugEnabled)
-        combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-          if (error != Errors.NONE) {
-            debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-              s"on partition $topicPartition failed due to ${error.exceptionName}")
-          }

Review Comment:
   We have removed all those debug logs in the previous PRs because the request log gives us the same in the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1053854859


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -407,136 +416,200 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Handle an offset commit request
    */
   def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val header = request.header
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-    val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-    // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = {
-      val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
-      if (isDebugEnabled)
-        combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-          if (error != Errors.NONE) {
-            debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-              s"on partition $topicPartition failed due to ${error.exceptionName}")
+    def sendResponse(response: OffsetCommitResponse): Unit = {
+      trace(s"Sending offset commit response $response for correlation id ${request.header.correlationId} to " +
+        s"client ${request.header.clientId}.")
+
+      if (isDebugEnabled) {
+        response.data.topics.forEach { topic =>
+          topic.partitions.forEach { partition =>
+            if (partition.errorCode != Errors.NONE.code) {
+              debug(s"Offset commit request with correlation id ${request.header.correlationId} from client ${request.header.clientId} " +
+                s"on partition ${topic.name}-${partition.partitionIndex} failed due to ${Errors.forCode(partition.errorCode)}")
+            }
           }
         }
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
+      }
+
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
 
-      // reject the request if not authorized to the group
+    // Reject the request if not authorized to the group
     if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) {
-      val error = Errors.GROUP_AUTHORIZATION_FAILED
-      val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-        offsetCommitRequest.data.topics,
-        error)
-
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(
-        new OffsetCommitResponseData()
-            .setTopics(responseTopicList)
-            .setThrottleTimeMs(requestThrottleMs)
-      ))
+      sendResponse(offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
     } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
-      val errorMap = new mutable.HashMap[TopicPartition, Errors]
-      for (topicData <- offsetCommitRequest.data.topics.asScala) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
+      sendResponse(offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+    } else {
+      val offsetCommitResponseData = new OffsetCommitResponseData()
+      val topicsPendingPartitions = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        offsetCommitRequest.data.topics.asScala
+      )(_.name)
+
+      def makePartitionResponse(
+        partitionIndex: Int,
+        error: Errors
+      ): OffsetCommitResponseData.OffsetCommitResponsePartition = {
+        new OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(partitionIndex)
+          .setErrorCode(error.code)
+      }
+
+      def addTopicToResponse(
+        topic: OffsetCommitRequestData.OffsetCommitRequestTopic,
+        error: Errors
+      ): Unit = {
+        val topicResponse = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(topic.name)
+        offsetCommitResponseData.topics.add(topicResponse)
+        topic.partitions.forEach { partition =>
+          topicResponse.partitions.add(makePartitionResponse(partition.partitionIndex, error))
         }
       }
-      sendResponseCallback(errorMap.toMap)
-    } else {
-      val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
 
-      val topics = offsetCommitRequest.data.topics.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
-      for (topicData <- topics) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          if (!authorizedTopics.contains(topicData.name))
-            unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
-          else if (!metadataCache.contains(topicPartition))
-            nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
-          else
-            authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+      val authorizedTopicsRequest = new util.ArrayList[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+      offsetCommitRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          addTopicToResponse(topic, Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          addTopicToResponse(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          var topicRequestWithValidPartitions: OffsetCommitRequestData.OffsetCommitRequestTopic = null

Review Comment:
   Coming from reviewing OffsetFetch, I noticed we didn't do this copying into a list (or at least it didn't take up as much space). Just curious if this is because of how the old interfaces (ie, method sigatures) were different between the two operations. And if there was a way to make this less verbose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1068171485


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -116,4 +118,80 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
     public boolean shouldClientThrottle(short version) {
         return version >= 4;
     }
+
+    public static class Builder {
+        OffsetCommitResponseData data = new OffsetCommitResponseData();
+        HashMap<String, OffsetCommitResponseTopic> byTopicName = new HashMap<>();
+
+        private OffsetCommitResponseTopic getOrCreateTopic(
+            String topicName
+        ) {
+            OffsetCommitResponseTopic topic = byTopicName.get(topicName);
+            if (topic == null) {
+                topic = new OffsetCommitResponseTopic().setName(topicName);
+                data.topics().add(topic);
+                byTopicName.put(topicName, topic);
+            }
+            return topic;
+        }
+
+        public Builder addPartition(
+            String topicName,
+            int partitionIndex,
+            Errors error
+        ) {
+            final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);
+
+            topicResponse.partitions().add(new OffsetCommitResponsePartition()
+                .setPartitionIndex(partitionIndex)
+                .setErrorCode(error.code()));
+
+            return this;
+        }
+
+        public <P> Builder addPartitions(
+            String topicName,
+            List<P> partitions,
+            Function<P, Integer> partitionIndex,
+            Errors error
+        ) {
+            final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);
+
+            partitions.forEach(partition -> {
+                topicResponse.partitions().add(new OffsetCommitResponsePartition()
+                    .setPartitionIndex(partitionIndex.apply(partition))
+                    .setErrorCode(error.code()));
+            });
+
+            return this;
+        }
+
+        public Builder merge(
+            OffsetCommitResponseData newData
+        ) {
+            if (data.topics().isEmpty()) {
+                // If the current data is empty, we can discard it and use the new data.
+                data = newData;
+            } else {
+                // Otherwise, we have to merge them together.
+                newData.topics().forEach(newTopic -> {
+                    OffsetCommitResponseTopic existingTopic = byTopicName.get(newTopic.name());
+                    if (existingTopic == null) {
+                        // If no topic exists, we can directly copy the new topic data.
+                        data.topics().add(newTopic);
+                        byTopicName.put(newTopic.name(), newTopic);
+                    } else {
+                        // Otherwise, we add the partitions to the existing one.
+                        existingTopic.partitions().addAll(newTopic.partitions());

Review Comment:
   Updated the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on PR #12886:
URL: https://github.com/apache/kafka/pull/12886#issuecomment-1380723669

   Still looks good to me 😄 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1067001037


##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -436,4 +440,74 @@ class GroupCoordinatorAdapterTest {
 
     assertEquals(expectedResults, future.get())
   }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+  def testCommitOffsets(version: Short): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val time = new MockTime()
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, time)
+    val now = time.milliseconds()
+
+    val ctx = makeContext(ApiKeys.OFFSET_COMMIT, version)
+    val data = new OffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setGenerationId(10)
+      .setRetentionTimeMs(1000)

Review Comment:
   We could but it does not add any value as the value is just copied to OffsetAndMetadata. As explained earlier, there is not validation for this.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -436,4 +440,74 @@ class GroupCoordinatorAdapterTest {
 
     assertEquals(expectedResults, future.get())
   }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+  def testCommitOffsets(version: Short): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val time = new MockTime()
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, time)
+    val now = time.milliseconds()
+
+    val ctx = makeContext(ApiKeys.OFFSET_COMMIT, version)
+    val data = new OffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setGenerationId(10)
+      .setRetentionTimeMs(1000)
+      .setTopics(List(
+        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+          .setName("foo")
+          .setPartitions(List(
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(100)
+              .setCommitTimestamp(now)

Review Comment:
   We could but it does not add any value as the value is just copied to OffsetAndMetadata. As explained earlier, there is not validation for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1067488043


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -234,4 +240,78 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+    context: RequestContext,
+    request: OffsetCommitRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+
+    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+      val response = new OffsetCommitResponseData()
+      val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      commitStatus.forKeyValue { (tp, error) =>
+        var topic = byTopics(tp.topic)
+        if (topic == null) {
+          topic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+          byTopics += tp.topic -> topic
+          response.topics.add(topic)
+        }
+        topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(tp.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
+    // expire timestamp is computed differently for v1 and v2.
+    //   - If v1 and no explicit commit timestamp is provided we treat it the same as v5.
+    //   - If v1 and explicit retention time is provided we calculate expiration timestamp based on that

Review Comment:
   Thanks David! Looks much clearer. I think the only thing to note is that when commit time is used, it seems like commit time + retention gives us expiration. It wasn't completely clear from the comment that that's how the commit time fit into the equation, but it can be inferred. Up to you if you want to change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054582822


##########
core/src/main/java/kafka/server/builders/KafkaApisBuilder.java:
##########
@@ -178,6 +179,7 @@ public KafkaApis build() {
                              metadataSupport,
                              replicaManager,
                              groupCoordinator,
+                             new GroupCoordinatorAdapter(groupCoordinator, time),

Review Comment:
   GroupCoordinatorAdapter is an internal change so we don't have to expose it in the builder. It only wraps the GroupCoordinator passed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] OmniaGM commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
OmniaGM commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1030626609


##########
core/src/main/java/kafka/server/builders/KafkaApisBuilder.java:
##########
@@ -178,6 +179,7 @@ public KafkaApis build() {
                              metadataSupport,
                              replicaManager,
                              groupCoordinator,
+                             new GroupCoordinatorAdapter(groupCoordinator, time),

Review Comment:
   Is there a reason for not having a setter for the GroupCoordinatorAdapter similar to `setGroupCoordinator`?



##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.common.OffsetAndMetadata
+import kafka.server.RequestLocal
+import kafka.utils.Implicits.MapExtensionMethods
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData, OffsetCommitRequestData, OffsetCommitResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests.OffsetCommitRequest
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.coordinator.group.GroupCoordinatorRequestContext
+
+import java.util
+import java.util.Optional
+import java.util.concurrent.CompletableFuture
+import scala.collection.{Map, mutable}
+import scala.jdk.CollectionConverters._
+
+/**
+ * GroupCoordinatorAdapter is a thin wrapper around kafka.coordinator.group.GroupCoordinator
+ * that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator interface.
+ */
+class GroupCoordinatorAdapter(
+  private val coordinator: GroupCoordinator,
+  private val time: Time
+) extends org.apache.kafka.coordinator.group.GroupCoordinator {
+
+  override def joinGroup(
+    context: GroupCoordinatorRequestContext,
+    request: JoinGroupRequestData
+  ): CompletableFuture[JoinGroupResponseData] = {
+    val future = new CompletableFuture[JoinGroupResponseData]()
+
+    def callback(joinResult: JoinGroupResult): Unit = {
+      future.complete(new JoinGroupResponseData()
+        .setErrorCode(joinResult.error.code)
+        .setGenerationId(joinResult.generationId)
+        .setProtocolType(joinResult.protocolType.orNull)
+        .setProtocolName(joinResult.protocolName.orNull)
+        .setLeader(joinResult.leaderId)
+        .setSkipAssignment(joinResult.skipAssignment)
+        .setMemberId(joinResult.memberId)
+        .setMembers(joinResult.members.asJava)
+      )
+    }
+
+    val groupInstanceId = Option(request.groupInstanceId)
+
+    // Only return MEMBER_ID_REQUIRED error if joinGroupRequest version is >= 4
+    // and groupInstanceId is configured to unknown.
+    val requireKnownMemberId = context.apiVersion >= 4 && groupInstanceId.isEmpty
+
+    val protocols = request.protocols.valuesList.asScala.map { protocol =>
+      (protocol.name, protocol.metadata)
+    }.toList
+
+    val supportSkippingAssignment = context.apiVersion >= 9
+
+    coordinator.handleJoinGroup(
+      request.groupId,
+      request.memberId,
+      groupInstanceId,
+      requireKnownMemberId,
+      supportSkippingAssignment,
+      context.clientId,
+      context.clientAddress.toString,
+      request.rebalanceTimeoutMs,
+      request.sessionTimeoutMs,
+      request.protocolType,
+      protocols,
+      callback,
+      Option(request.reason),
+      RequestLocal(context.bufferSupplier)
+    )
+
+    future
+  }
+
+  override def commitOffsets(
+    context: GroupCoordinatorRequestContext,
+    request: OffsetCommitRequestData
+  ): CompletableFuture[OffsetCommitResponseData] = {
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+
+    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+      val response = new OffsetCommitResponseData()
+      val byTopics = new util.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()

Review Comment:
   I noticed that `byTopic` use `java.util.HasMap` while `partitions` in Line:129 use scala `mutable.HasMap` any reason why we are using two different Maps? Can't we use here `mutable.HashMap` for both?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on PR #12886:
URL: https://github.com/apache/kafka/pull/12886#issuecomment-1360502862

   Looks like this needs a rebase. I will take another pass when that is complete.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054592382


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -407,136 +416,200 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Handle an offset commit request
    */
   def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val header = request.header
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-    val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-    // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = {
-      val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
-      if (isDebugEnabled)
-        combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-          if (error != Errors.NONE) {
-            debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-              s"on partition $topicPartition failed due to ${error.exceptionName}")
+    def sendResponse(response: OffsetCommitResponse): Unit = {
+      trace(s"Sending offset commit response $response for correlation id ${request.header.correlationId} to " +
+        s"client ${request.header.clientId}.")
+
+      if (isDebugEnabled) {
+        response.data.topics.forEach { topic =>
+          topic.partitions.forEach { partition =>
+            if (partition.errorCode != Errors.NONE.code) {
+              debug(s"Offset commit request with correlation id ${request.header.correlationId} from client ${request.header.clientId} " +
+                s"on partition ${topic.name}-${partition.partitionIndex} failed due to ${Errors.forCode(partition.errorCode)}")
+            }
           }
         }
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
+      }
+
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
 
-      // reject the request if not authorized to the group
+    // Reject the request if not authorized to the group
     if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) {
-      val error = Errors.GROUP_AUTHORIZATION_FAILED
-      val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-        offsetCommitRequest.data.topics,
-        error)
-
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(
-        new OffsetCommitResponseData()
-            .setTopics(responseTopicList)
-            .setThrottleTimeMs(requestThrottleMs)
-      ))
+      sendResponse(offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
     } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
-      val errorMap = new mutable.HashMap[TopicPartition, Errors]
-      for (topicData <- offsetCommitRequest.data.topics.asScala) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
+      sendResponse(offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+    } else {
+      val offsetCommitResponseData = new OffsetCommitResponseData()
+      val topicsPendingPartitions = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        offsetCommitRequest.data.topics.asScala
+      )(_.name)
+
+      def makePartitionResponse(
+        partitionIndex: Int,
+        error: Errors
+      ): OffsetCommitResponseData.OffsetCommitResponsePartition = {
+        new OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(partitionIndex)
+          .setErrorCode(error.code)
+      }
+
+      def addTopicToResponse(
+        topic: OffsetCommitRequestData.OffsetCommitRequestTopic,
+        error: Errors
+      ): Unit = {
+        val topicResponse = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(topic.name)
+        offsetCommitResponseData.topics.add(topicResponse)
+        topic.partitions.forEach { partition =>
+          topicResponse.partitions.add(makePartitionResponse(partition.partitionIndex, error))
         }
       }
-      sendResponseCallback(errorMap.toMap)
-    } else {
-      val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
 
-      val topics = offsetCommitRequest.data.topics.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
-      for (topicData <- topics) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          if (!authorizedTopics.contains(topicData.name))
-            unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
-          else if (!metadataCache.contains(topicPartition))
-            nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
-          else
-            authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+      val authorizedTopicsRequest = new util.ArrayList[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+      offsetCommitRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          addTopicToResponse(topic, Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          addTopicToResponse(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          var topicRequestWithValidPartitions: OffsetCommitRequestData.OffsetCommitRequestTopic = null

Review Comment:
   The big difference here is that we validate the topic and partitions exists. This validation is not present in OffsetFetch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1054814682


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -234,4 +240,78 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+    context: RequestContext,
+    request: OffsetCommitRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+
+    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+      val response = new OffsetCommitResponseData()
+      val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      commitStatus.forKeyValue { (tp, error) =>
+        var topic = byTopics(tp.topic)
+        if (topic == null) {
+          topic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+          byTopics += tp.topic -> topic
+          response.topics.add(topic)
+        }
+        topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(tp.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
+    // expire timestamp is computed differently for v1 and v2.
+    //   - If v1 and no explicit commit timestamp is provided we treat it the same as v5.
+    //   - If v1 and explicit retention time is provided we calculate expiration timestamp based on that

Review Comment:
   This comment is a little confusing. 
   So it seems like I understand v1 semantics -- we use the commit timestamp if provided for "now". 
   
   For v2 and beyond, I'm a big confused about the last two bullets. It makes it seem like there is no difference between v2-v4 and v5+, but I think the difference is that the retention can no longer be overridden in v5+. That part is unclear in the last bullet as it says "partition expiration" but "RetentionTimeMs" is the field name.
   
   This is my understanding based on the code
   
   ```
   version:  (can define commit time aka "now"), (can define retention time)
      1                 yes                                no
      2                 no                                 yes
      3                 no                                 yes
      4                 no                                 yes
     5+                 no                                 no
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac merged PR #12886:
URL: https://github.com/apache/kafka/pull/12886


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12886:
URL: https://github.com/apache/kafka/pull/12886#issuecomment-1380731056

   I will go ahead and merge it. I can do follow-ups if needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jeffkbkim commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1066438547


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -116,4 +118,80 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
     public boolean shouldClientThrottle(short version) {
         return version >= 4;
     }
+
+    public static class Builder {
+        OffsetCommitResponseData data = new OffsetCommitResponseData();
+        HashMap<String, OffsetCommitResponseTopic> byTopicName = new HashMap<>();
+
+        private OffsetCommitResponseTopic getOrCreateTopic(
+            String topicName
+        ) {
+            OffsetCommitResponseTopic topic = byTopicName.get(topicName);
+            if (topic == null) {
+                topic = new OffsetCommitResponseTopic().setName(topicName);
+                data.topics().add(topic);
+                byTopicName.put(topicName, topic);
+            }
+            return topic;
+        }
+
+        public Builder addPartition(
+            String topicName,
+            int partitionIndex,
+            Errors error
+        ) {
+            final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);
+
+            topicResponse.partitions().add(new OffsetCommitResponsePartition()
+                .setPartitionIndex(partitionIndex)
+                .setErrorCode(error.code()));
+
+            return this;
+        }
+
+        public <P> Builder addPartitions(
+            String topicName,
+            List<P> partitions,
+            Function<P, Integer> partitionIndex,
+            Errors error
+        ) {
+            final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName);
+
+            partitions.forEach(partition -> {
+                topicResponse.partitions().add(new OffsetCommitResponsePartition()
+                    .setPartitionIndex(partitionIndex.apply(partition))
+                    .setErrorCode(error.code()));
+            });
+
+            return this;
+        }
+
+        public Builder merge(
+            OffsetCommitResponseData newData
+        ) {
+            if (data.topics().isEmpty()) {
+                // If the current data is empty, we can discard it and use the new data.
+                data = newData;
+            } else {
+                // Otherwise, we have to merge them together.
+                newData.topics().forEach(newTopic -> {
+                    OffsetCommitResponseTopic existingTopic = byTopicName.get(newTopic.name());
+                    if (existingTopic == null) {
+                        // If no topic exists, we can directly copy the new topic data.
+                        data.topics().add(newTopic);
+                        byTopicName.put(newTopic.name(), newTopic);
+                    } else {
+                        // Otherwise, we add the partitions to the existing one.
+                        existingTopic.partitions().addAll(newTopic.partitions());

Review Comment:
   Q: from the code it seems that `existingTopic` can only include partitions that failed in some way. we are assuming that there will be no overlap between existing partitions and `newTopic` partitions. should we add a check?



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -116,4 +118,80 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
     public boolean shouldClientThrottle(short version) {
         return version >= 4;
     }
+
+    public static class Builder {
+        OffsetCommitResponseData data = new OffsetCommitResponseData();
+        HashMap<String, OffsetCommitResponseTopic> byTopicName = new HashMap<>();
+
+        private OffsetCommitResponseTopic getOrCreateTopic(

Review Comment:
   nit: getOrAddTopic makes more sense to me



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -411,137 +411,144 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle an offset commit request
    */
-  def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val header = request.header
+  def handleOffsetCommitRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-    val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-    // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = {
-      val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
-      if (isDebugEnabled)
-        combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-          if (error != Errors.NONE) {
-            debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-              s"on partition $topicPartition failed due to ${error.exceptionName}")
-          }
-        }
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
-    }
-
-      // reject the request if not authorized to the group
+    // Reject the request if not authorized to the group
     if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) {
-      val error = Errors.GROUP_AUTHORIZATION_FAILED
-      val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-        offsetCommitRequest.data.topics,
-        error)
-
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(
-        new OffsetCommitResponseData()
-            .setTopics(responseTopicList)
-            .setThrottleTimeMs(requestThrottleMs)
-      ))
+      requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
-      val errorMap = new mutable.HashMap[TopicPartition, Errors]
-      for (topicData <- offsetCommitRequest.data.topics.asScala) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
-        }
-      }
-      sendResponseCallback(errorMap.toMap)
+      requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else {
-      val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        offsetCommitRequest.data.topics.asScala
+      )(_.name)
+
+      val responseBuilder = new OffsetCommitResponse.Builder()
+      val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+      offsetCommitRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          val topicWithValidPartitions = new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name)
 
-      val topics = offsetCommitRequest.data.topics.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
-      for (topicData <- topics) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          if (!authorizedTopics.contains(topicData.name))
-            unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
-          else if (!metadataCache.contains(topicPartition))
-            nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
-          else
-            authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+          topic.partitions.forEach { partition =>
+            if (metadataCache.getPartitionInfo(topic.name, partition.partitionIndex).nonEmpty) {
+              topicWithValidPartitions.partitions.add(partition)
+            } else {
+              responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+            }
+          }
+
+          if (!topicWithValidPartitions.partitions.isEmpty) {
+            authorizedTopicsRequest += topicWithValidPartitions
+          }
         }
       }
 
-      val authorizedTopicRequestInfo = authorizedTopicRequestInfoBldr.result()
-
-      if (authorizedTopicRequestInfo.isEmpty)
-        sendResponseCallback(Map.empty)
-      else if (header.apiVersion == 0) {
-        // for version 0 always store offsets to ZK
-        val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset commit requests"))
-        val responseInfo = authorizedTopicRequestInfo.map {
-          case (topicPartition, partitionData) =>
-            try {
-              if (partitionData.committedMetadata() != null
-                && partitionData.committedMetadata().length > config.offsetMetadataMaxSize)
-                (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
-              else {
-                zkSupport.zkClient.setOrCreateConsumerOffset(
-                  offsetCommitRequest.data.groupId,
-                  topicPartition,
-                  partitionData.committedOffset)
-                (topicPartition, Errors.NONE)
-              }
-            } catch {
-              case e: Throwable => (topicPartition, Errors.forException(e))
-            }
-        }
-        sendResponseCallback(responseInfo)
+      if (authorizedTopicsRequest.isEmpty) {
+        requestHelper.sendMaybeThrottle(request, responseBuilder.build())
+        CompletableFuture.completedFuture(())
+      } else if (request.header.apiVersion == 0) {
+        // For version 0, always store offsets to ZK.
+        commitOffsetsToZookeeper(
+          request,
+          offsetCommitRequest,
+          authorizedTopicsRequest,
+          responseBuilder
+        )
       } else {
-        // for version 1 and beyond store offsets in offset manager
-
-        // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
-        // expire timestamp is computed differently for v1 and v2.
-        //   - If v1 and no explicit commit timestamp is provided we treat it the same as v5.
-        //   - If v1 and explicit retention time is provided we calculate expiration timestamp based on that
-        //   - If v2/v3/v4 (no explicit commit timestamp) we treat it the same as v5.
-        //   - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect
-        val currentTimestamp = time.milliseconds
-        val partitionData = authorizedTopicRequestInfo.map { case (k, partitionData) =>
-          val metadata = if (partitionData.committedMetadata == null)
-            OffsetAndMetadata.NoMetadata
-          else
-            partitionData.committedMetadata
+        // For version > 0, store offsets to Coordinator.

Review Comment:
   nit: offsets in



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -411,137 +411,144 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle an offset commit request
    */
-  def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val header = request.header
+  def handleOffsetCommitRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-    val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-    // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = {
-      val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
-      if (isDebugEnabled)
-        combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-          if (error != Errors.NONE) {
-            debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-              s"on partition $topicPartition failed due to ${error.exceptionName}")
-          }
-        }
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava))
-    }
-
-      // reject the request if not authorized to the group
+    // Reject the request if not authorized to the group
     if (!authHelper.authorize(request.context, READ, GROUP, offsetCommitRequest.data.groupId)) {
-      val error = Errors.GROUP_AUTHORIZATION_FAILED
-      val responseTopicList = OffsetCommitRequest.getErrorResponseTopics(
-        offsetCommitRequest.data.topics,
-        error)
-
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(
-        new OffsetCommitResponseData()
-            .setTopics(responseTopicList)
-            .setThrottleTimeMs(requestThrottleMs)
-      ))
+      requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
       // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
       // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
       // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
-      val errorMap = new mutable.HashMap[TopicPartition, Errors]
-      for (topicData <- offsetCommitRequest.data.topics.asScala) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          errorMap += topicPartition -> Errors.UNSUPPORTED_VERSION
-        }
-      }
-      sendResponseCallback(errorMap.toMap)
+      requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
     } else {
-      val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequestData.OffsetCommitRequestPartition]
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        offsetCommitRequest.data.topics.asScala
+      )(_.name)
+
+      val responseBuilder = new OffsetCommitResponse.Builder()
+      val authorizedTopicsRequest = new mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
+      offsetCommitRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          val topicWithValidPartitions = new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topic.name)
 
-      val topics = offsetCommitRequest.data.topics.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
-      for (topicData <- topics) {
-        for (partitionData <- topicData.partitions.asScala) {
-          val topicPartition = new TopicPartition(topicData.name, partitionData.partitionIndex)
-          if (!authorizedTopics.contains(topicData.name))
-            unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED)
-          else if (!metadataCache.contains(topicPartition))
-            nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
-          else
-            authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
+          topic.partitions.forEach { partition =>
+            if (metadataCache.getPartitionInfo(topic.name, partition.partitionIndex).nonEmpty) {
+              topicWithValidPartitions.partitions.add(partition)
+            } else {
+              responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+            }
+          }
+
+          if (!topicWithValidPartitions.partitions.isEmpty) {
+            authorizedTopicsRequest += topicWithValidPartitions
+          }
         }
       }
 
-      val authorizedTopicRequestInfo = authorizedTopicRequestInfoBldr.result()
-
-      if (authorizedTopicRequestInfo.isEmpty)
-        sendResponseCallback(Map.empty)
-      else if (header.apiVersion == 0) {
-        // for version 0 always store offsets to ZK
-        val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset commit requests"))
-        val responseInfo = authorizedTopicRequestInfo.map {
-          case (topicPartition, partitionData) =>
-            try {
-              if (partitionData.committedMetadata() != null
-                && partitionData.committedMetadata().length > config.offsetMetadataMaxSize)
-                (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
-              else {
-                zkSupport.zkClient.setOrCreateConsumerOffset(
-                  offsetCommitRequest.data.groupId,
-                  topicPartition,
-                  partitionData.committedOffset)
-                (topicPartition, Errors.NONE)
-              }
-            } catch {
-              case e: Throwable => (topicPartition, Errors.forException(e))
-            }
-        }
-        sendResponseCallback(responseInfo)
+      if (authorizedTopicsRequest.isEmpty) {
+        requestHelper.sendMaybeThrottle(request, responseBuilder.build())
+        CompletableFuture.completedFuture(())
+      } else if (request.header.apiVersion == 0) {
+        // For version 0, always store offsets to ZK.

Review Comment:
   nit: offsets in



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -436,4 +440,74 @@ class GroupCoordinatorAdapterTest {
 
     assertEquals(expectedResults, future.get())
   }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+  def testCommitOffsets(version: Short): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val time = new MockTime()
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, time)
+    val now = time.milliseconds()
+
+    val ctx = makeContext(ApiKeys.OFFSET_COMMIT, version)
+    val data = new OffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setGenerationId(10)
+      .setRetentionTimeMs(1000)
+      .setTopics(List(
+        new OffsetCommitRequestData.OffsetCommitRequestTopic()
+          .setName("foo")
+          .setPartitions(List(
+            new OffsetCommitRequestData.OffsetCommitRequestPartition()
+              .setPartitionIndex(0)
+              .setCommittedOffset(100)
+              .setCommitTimestamp(now)

Review Comment:
   can we test values less than -1?



##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -234,4 +240,79 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+    context: RequestContext,

Review Comment:
   i noticed this isn't used here as well as for the other coordinator APIs (other than joinGroup). what's the reason for having this parameter? are we expecting to use this in the new group coordinator?



##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -234,4 +240,79 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+    context: RequestContext,
+    request: OffsetCommitRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+    val currentTimeMs = time.milliseconds
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+
+    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+      val response = new OffsetCommitResponseData()
+      val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      commitStatus.forKeyValue { (tp, error) =>
+        val topic = byTopics.get(tp.topic) match {
+          case Some(existingTopic) =>
+            existingTopic
+          case None =>
+            val newTopic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+            byTopics += tp.topic -> newTopic
+            response.topics.add(newTopic)
+            newTopic
+        }
+
+        topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(tp.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    // "default" expiration timestamp is defined as now + retention. The retention may be overridden
+    // in versions from v2 to v4. Otherwise, the retention defined on the broker is used. If an explicit
+    // commit timestamp is provided (v1 only), the expiration timestamp is computed based on that.
+    val expireTimeMs = request.retentionTimeMs match {
+      case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
+      case retentionTimeMs => Some(currentTimeMs + retentionTimeMs)
+    }
+
+    val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        val tp = new TopicPartition(topic.name, partition.partitionIndex)
+        partitions += tp -> new OffsetAndMetadata(
+          offset = partition.committedOffset,
+          leaderEpoch = partition.committedLeaderEpoch match {
+            case RecordBatch.NO_PARTITION_LEADER_EPOCH => Optional.empty[Integer]
+            case committedLeaderEpoch => Optional.of[Integer](committedLeaderEpoch)
+          },
+          metadata = partition.committedMetadata match {
+            case null => OffsetAndMetadata.NoMetadata
+            case metadata => metadata
+          },
+          commitTimestamp = partition.commitTimestamp match {
+            case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimeMs
+            case customTimestamp => customTimestamp
+          },

Review Comment:
   i wasn't able to find where we validate the commit timestamp. how do we handle timestamps that are less than -1? i am also curious about retention time ms.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -411,137 +411,144 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle an offset commit request
    */
-  def handleOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
-    val header = request.header
+  def handleOffsetCommitRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
     val offsetCommitRequest = request.body[OffsetCommitRequest]
 
-    val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-    val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-    // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]): Unit = {
-      val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors
-      if (isDebugEnabled)
-        combinedCommitStatus.forKeyValue { (topicPartition, error) =>
-          if (error != Errors.NONE) {
-            debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-              s"on partition $topicPartition failed due to ${error.exceptionName}")
-          }

Review Comment:
   i think we lost this debug log, can we add it back? at least when the future from newGroupCoordinator.commitOffsets completes exceptionally. and consider adding it for TOPIC_AUTHORIZATION_FAILED and UNKNOWN_TOPIC_OR_PARTITION errors.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##########
@@ -436,4 +440,74 @@ class GroupCoordinatorAdapterTest {
 
     assertEquals(expectedResults, future.get())
   }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+  def testCommitOffsets(version: Short): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val time = new MockTime()
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, time)
+    val now = time.milliseconds()
+
+    val ctx = makeContext(ApiKeys.OFFSET_COMMIT, version)
+    val data = new OffsetCommitRequestData()
+      .setGroupId("group")
+      .setMemberId("member")
+      .setGenerationId(10)
+      .setRetentionTimeMs(1000)

Review Comment:
   can we test values less than -1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1066981875


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -116,4 +118,80 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
     public boolean shouldClientThrottle(short version) {
         return version >= 4;
     }
+
+    public static class Builder {
+        OffsetCommitResponseData data = new OffsetCommitResponseData();
+        HashMap<String, OffsetCommitResponseTopic> byTopicName = new HashMap<>();
+
+        private OffsetCommitResponseTopic getOrCreateTopic(

Review Comment:
   That could work as well but I personally prefer `getOrCreate` in this case. `getOrCreate` is used pretty extensively in the code base as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1068220054


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -234,4 +240,78 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+    context: RequestContext,
+    request: OffsetCommitRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+
+    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+      val response = new OffsetCommitResponseData()
+      val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      commitStatus.forKeyValue { (tp, error) =>
+        var topic = byTopics(tp.topic)
+        if (topic == null) {
+          topic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+          byTopics += tp.topic -> topic
+          response.topics.add(topic)
+        }
+        topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(tp.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
+    // expire timestamp is computed differently for v1 and v2.
+    //   - If v1 and no explicit commit timestamp is provided we treat it the same as v5.
+    //   - If v1 and explicit retention time is provided we calculate expiration timestamp based on that

Review Comment:
   Yeah, I thought that it is implicit that the commit time replaces "now" in the comment in this case. I think that we can leave it as it is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org