You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/10 10:33:27 UTC

[GitHub] [kafka] dajac commented on a diff in pull request #12886: KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface

dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1065606143


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -234,4 +240,78 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+    context: RequestContext,
+    request: OffsetCommitRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+
+    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+      val response = new OffsetCommitResponseData()
+      val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      commitStatus.forKeyValue { (tp, error) =>
+        var topic = byTopics(tp.topic)
+        if (topic == null) {
+          topic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+          byTopics += tp.topic -> topic
+          response.topics.add(topic)
+        }
+        topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(tp.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
+    // expire timestamp is computed differently for v1 and v2.
+    //   - If v1 and no explicit commit timestamp is provided we treat it the same as v5.
+    //   - If v1 and explicit retention time is provided we calculate expiration timestamp based on that

Review Comment:
   I rewrote the comment. Let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org