You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/23 20:34:38 UTC

[GitHub] [kafka] dajac opened a new pull request, #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

dajac opened a new pull request, #12902:
URL: https://github.com/apache/kafka/pull/12902

   WIP
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1069791363


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -470,4 +470,45 @@ class GroupCoordinatorAdapter(
       expireTimestamp = expireTimestamp
     )
   }
+
+  override def deleteOffsets(
+    context: RequestContext,
+    request: OffsetDeleteRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetDeleteResponseData] = {
+    val future = new CompletableFuture[OffsetDeleteResponseData]()
+
+    val partitions = mutable.ArrayBuffer[TopicPartition]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        partitions += new TopicPartition(topic.name, partition.partitionIndex)
+      }
+    }
+
+    val (groupError, topicPartitionResults) = coordinator.handleDeleteOffsets(
+      request.groupId,
+      partitions,
+      RequestLocal(bufferSupplier)
+    )
+
+    if (groupError != Errors.NONE) {
+      future.completeExceptionally(groupError.exception)
+    } else {
+      val response = new OffsetDeleteResponseData()
+      topicPartitionResults.forKeyValue { (topicPartition, error) =>
+        var topic = response.topics.find(topicPartition.topic)

Review Comment:
   TIL that topics are ImplicitLinkedHashMultiCollection.Elements. 😅 
   I'm curious why this specific response gets this typing compared to similar request/response jsons. But good to know that this find operation works and is fairly efficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1067069590


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.RequestLocal
+import kafka.utils.Implicits.MapExtensionMethods
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.RequestContext
+import org.apache.kafka.common.utils.BufferSupplier
+
+import java.util.concurrent.CompletableFuture
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * GroupCoordinatorAdapter is a thin wrapper around kafka.coordinator.group.GroupCoordinator
+ * that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator interface.
+ */
+class GroupCoordinatorAdapter(
+  val coordinator: GroupCoordinator
+) extends org.apache.kafka.coordinator.group.GroupCoordinator {
+
+  override def joinGroup(
+    context: RequestContext,
+    request: JoinGroupRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[JoinGroupResponseData] = {
+    val future = new CompletableFuture[JoinGroupResponseData]()
+
+    def callback(joinResult: JoinGroupResult): Unit = {
+      future.complete(new JoinGroupResponseData()
+        .setErrorCode(joinResult.error.code)
+        .setGenerationId(joinResult.generationId)
+        .setProtocolType(joinResult.protocolType.orNull)
+        .setProtocolName(joinResult.protocolName.orNull)
+        .setLeader(joinResult.leaderId)
+        .setSkipAssignment(joinResult.skipAssignment)
+        .setMemberId(joinResult.memberId)
+        .setMembers(joinResult.members.asJava)
+      )
+    }
+
+    val groupInstanceId = Option(request.groupInstanceId)
+
+    // Only return MEMBER_ID_REQUIRED error if joinGroupRequest version is >= 4
+    // and groupInstanceId is configured to unknown.
+    val requireKnownMemberId = context.apiVersion >= 4 && groupInstanceId.isEmpty
+
+    val protocols = request.protocols.valuesList.asScala.map { protocol =>
+      (protocol.name, protocol.metadata)
+    }.toList
+
+    val supportSkippingAssignment = context.apiVersion >= 9
+
+    coordinator.handleJoinGroup(
+      request.groupId,
+      request.memberId,
+      groupInstanceId,
+      requireKnownMemberId,
+      supportSkippingAssignment,
+      context.clientId,
+      context.clientAddress.toString,
+      request.rebalanceTimeoutMs,
+      request.sessionTimeoutMs,
+      request.protocolType,
+      protocols,
+      callback,
+      Option(request.reason),
+      RequestLocal(bufferSupplier)
+    )
+
+    future
+  }
+
+  override def deleteOffsets(
+    context: RequestContext,
+    request: OffsetDeleteRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetDeleteResponseData] = {
+    val future = new CompletableFuture[OffsetDeleteResponseData]()
+
+    val partitions = mutable.ArrayBuffer[TopicPartition]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        partitions += new TopicPartition(topic.name, partition.partitionIndex)
+      }
+    }
+
+    val (groupError, topicPartitionResults) = coordinator.handleDeleteOffsets(
+      request.groupId,
+      partitions,
+      RequestLocal(bufferSupplier)
+    )
+
+    if (groupError != Errors.NONE) {
+      future.completeExceptionally(groupError.exception)
+    } else {
+      val response = new OffsetDeleteResponseData()
+      topicPartitionResults.forKeyValue { (topicPartition, error) =>
+        var topic = response.topics.find(topicPartition.topic)
+        if (topic == null) {
+          topic = new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setName(topicPartition.topic)
+          response.topics.add(topic)
+        }
+        topic.partitions.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+          .setPartitionIndex(topicPartition.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    future

Review Comment:
   The concept is not new in Kafka. This is how we implemented the quorum controller. I follow the same patterns that we have introduced there. I agree that future are not strictly necessary. We could keep using callbacks as today. I find Future nicer to work with, personally.
   
   I don't have a detailed design doc for this part as we usually treat this as implementation details. However, I will have a first implementation of the core building block of the new group coordinator soon. This will help to understand the new architecture. Does it work for you?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1066375643


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.RequestLocal
+import kafka.utils.Implicits.MapExtensionMethods
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.RequestContext
+import org.apache.kafka.common.utils.BufferSupplier
+
+import java.util.concurrent.CompletableFuture
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * GroupCoordinatorAdapter is a thin wrapper around kafka.coordinator.group.GroupCoordinator
+ * that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator interface.
+ */
+class GroupCoordinatorAdapter(
+  val coordinator: GroupCoordinator
+) extends org.apache.kafka.coordinator.group.GroupCoordinator {
+
+  override def joinGroup(
+    context: RequestContext,
+    request: JoinGroupRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[JoinGroupResponseData] = {
+    val future = new CompletableFuture[JoinGroupResponseData]()
+
+    def callback(joinResult: JoinGroupResult): Unit = {
+      future.complete(new JoinGroupResponseData()
+        .setErrorCode(joinResult.error.code)
+        .setGenerationId(joinResult.generationId)
+        .setProtocolType(joinResult.protocolType.orNull)
+        .setProtocolName(joinResult.protocolName.orNull)
+        .setLeader(joinResult.leaderId)
+        .setSkipAssignment(joinResult.skipAssignment)
+        .setMemberId(joinResult.memberId)
+        .setMembers(joinResult.members.asJava)
+      )
+    }
+
+    val groupInstanceId = Option(request.groupInstanceId)
+
+    // Only return MEMBER_ID_REQUIRED error if joinGroupRequest version is >= 4
+    // and groupInstanceId is configured to unknown.
+    val requireKnownMemberId = context.apiVersion >= 4 && groupInstanceId.isEmpty
+
+    val protocols = request.protocols.valuesList.asScala.map { protocol =>
+      (protocol.name, protocol.metadata)
+    }.toList
+
+    val supportSkippingAssignment = context.apiVersion >= 9
+
+    coordinator.handleJoinGroup(
+      request.groupId,
+      request.memberId,
+      groupInstanceId,
+      requireKnownMemberId,
+      supportSkippingAssignment,
+      context.clientId,
+      context.clientAddress.toString,
+      request.rebalanceTimeoutMs,
+      request.sessionTimeoutMs,
+      request.protocolType,
+      protocols,
+      callback,
+      Option(request.reason),
+      RequestLocal(bufferSupplier)
+    )
+
+    future
+  }
+
+  override def deleteOffsets(
+    context: RequestContext,
+    request: OffsetDeleteRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetDeleteResponseData] = {
+    val future = new CompletableFuture[OffsetDeleteResponseData]()
+
+    val partitions = mutable.ArrayBuffer[TopicPartition]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        partitions += new TopicPartition(topic.name, partition.partitionIndex)
+      }
+    }
+
+    val (groupError, topicPartitionResults) = coordinator.handleDeleteOffsets(
+      request.groupId,
+      partitions,
+      RequestLocal(bufferSupplier)
+    )
+
+    if (groupError != Errors.NONE) {
+      future.completeExceptionally(groupError.exception)
+    } else {
+      val response = new OffsetDeleteResponseData()
+      topicPartitionResults.forKeyValue { (topicPartition, error) =>
+        var topic = response.topics.find(topicPartition.topic)
+        if (topic == null) {
+          topic = new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setName(topicPartition.topic)
+          response.topics.add(topic)
+        }
+        topic.partitions.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+          .setPartitionIndex(topicPartition.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    future

Review Comment:
   From the KIP-848: ```This KIP proposes to evolve the group coordinator to rely on an event loop. The rational of using an event loop is that 1) it simplifies the concurrency and 2) enables simulation testing. The group coordinator will have a replicated state machine per __consumer_offsets partitions, where each replicated state machine is modelled as an event loop. Those replicated state machines will be executed in group.coordinator.threads threads.``` 
   
   Not sure I can easily see from that intent that we need to use futures.  E.g. request threads use event loop without futures, so I'm curious how the event loop with futures is going to look like (this is not to say that it's hard to do, but looks like a new concept in Kafka, so it would be good to see a more detailed design,).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1070933822


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2764,37 +2763,170 @@ class KafkaApisTest {
     val request = buildRequest(offsetDeleteRequest)
 
     val requestLocal = RequestLocal.withThreadConfinedCaching
-    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-      any[Long])).thenReturn(0)
-    when(groupCoordinator.handleDeleteOffsets(
-      ArgumentMatchers.eq(group),
-      ArgumentMatchers.eq(Seq(
-        new TopicPartition("topic-1", 0),
-        new TopicPartition("topic-1", 1),
-        new TopicPartition("topic-2", 0),
-        new TopicPartition("topic-2", 1)
-      )),
-      ArgumentMatchers.eq(requestLocal)
-    )).thenReturn((Errors.NONE, Map(
-      new TopicPartition("topic-1", 0) -> Errors.NONE,
-      new TopicPartition("topic-1", 1) -> Errors.NONE,
-      new TopicPartition("topic-2", 0) -> Errors.NONE,
-      new TopicPartition("topic-2", 1) -> Errors.NONE,
-    )))
+    val future = new CompletableFuture[OffsetDeleteResponseData]()
+    when(newGroupCoordinator.deleteOffsets(
+      request.context,
+      offsetDeleteRequest.data,
+      requestLocal.bufferSupplier
+    )).thenReturn(future)
 
     createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
 
+    val offsetDeleteResponseData = new OffsetDeleteResponseData()
+      .setTopics(new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(List(
+        new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+          .setName("topic-1")

Review Comment:
   good catch. fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1068450233


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3110,61 +3110,69 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleOffsetDeleteRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
+  def handleOffsetDeleteRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
     val offsetDeleteRequest = request.body[OffsetDeleteRequest]
-    val groupId = offsetDeleteRequest.data.groupId
 
-    if (authHelper.authorize(request.context, DELETE, GROUP, groupId)) {
-      val topics = offsetDeleteRequest.data.topics.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
+    if (!authHelper.authorize(request.context, DELETE, GROUP, offsetDeleteRequest.data.groupId)) {
+      requestHelper.sendMaybeThrottle(request, offsetDeleteRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else {
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        offsetDeleteRequest.data.topics.asScala
+      )(_.name)
+
+      val responseBuilder = new OffsetDeleteResponse.Builder
+      val authorizedTopicPartitions = new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection()
+      offsetDeleteRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          responseBuilder.addPartitions[OffsetDeleteRequestData.OffsetDeleteRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          responseBuilder.addPartitions[OffsetDeleteRequestData.OffsetDeleteRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          val topicWithValidPartitions = new OffsetDeleteRequestData.OffsetDeleteRequestTopic().setName(topic.name)
 
-      val topicPartitionErrors = mutable.Map[TopicPartition, Errors]()
-      val topicPartitions = mutable.ArrayBuffer[TopicPartition]()
+          topic.partitions.forEach { partition =>

Review Comment:
   I think that we did here: https://github.com/apache/kafka/pull/12902/files#diff-cc056b4960ededba37a438b1454f0f3c5ff5e8ad5e6d2ec9a08e813ca056ffebL3130.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jeffkbkim commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1071419097


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -470,4 +470,45 @@ class GroupCoordinatorAdapter(
       expireTimestamp = expireTimestamp
     )
   }
+
+  override def deleteOffsets(
+    context: RequestContext,
+    request: OffsetDeleteRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetDeleteResponseData] = {
+    val future = new CompletableFuture[OffsetDeleteResponseData]()
+
+    val partitions = mutable.ArrayBuffer[TopicPartition]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        partitions += new TopicPartition(topic.name, partition.partitionIndex)
+      }
+    }
+
+    val (groupError, topicPartitionResults) = coordinator.handleDeleteOffsets(
+      request.groupId,
+      partitions,
+      RequestLocal(bufferSupplier)
+    )
+
+    if (groupError != Errors.NONE) {
+      future.completeExceptionally(groupError.exception)
+    } else {
+      val response = new OffsetDeleteResponseData()
+      topicPartitionResults.forKeyValue { (topicPartition, error) =>
+        var topic = response.topics.find(topicPartition.topic)

Review Comment:
   aha, that makes sense. thanks! i noticed the changes in OffsetDeleteResponse.java were a bit different than the other APIs as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1069967097


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -470,4 +470,45 @@ class GroupCoordinatorAdapter(
       expireTimestamp = expireTimestamp
     )
   }
+
+  override def deleteOffsets(
+    context: RequestContext,
+    request: OffsetDeleteRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetDeleteResponseData] = {
+    val future = new CompletableFuture[OffsetDeleteResponseData]()
+
+    val partitions = mutable.ArrayBuffer[TopicPartition]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        partitions += new TopicPartition(topic.name, partition.partitionIndex)
+      }
+    }
+
+    val (groupError, topicPartitionResults) = coordinator.handleDeleteOffsets(
+      request.groupId,
+      partitions,
+      RequestLocal(bufferSupplier)
+    )
+
+    if (groupError != Errors.NONE) {
+      future.completeExceptionally(groupError.exception)
+    } else {
+      val response = new OffsetDeleteResponseData()
+      topicPartitionResults.forKeyValue { (topicPartition, error) =>
+        var topic = response.topics.find(topicPartition.topic)

Review Comment:
   That’s correct. The « map key » does that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1070186244


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -470,4 +470,45 @@ class GroupCoordinatorAdapter(
       expireTimestamp = expireTimestamp
     )
   }
+
+  override def deleteOffsets(
+    context: RequestContext,
+    request: OffsetDeleteRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetDeleteResponseData] = {
+    val future = new CompletableFuture[OffsetDeleteResponseData]()
+
+    val partitions = mutable.ArrayBuffer[TopicPartition]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        partitions += new TopicPartition(topic.name, partition.partitionIndex)
+      }
+    }
+
+    val (groupError, topicPartitionResults) = coordinator.handleDeleteOffsets(
+      request.groupId,
+      partitions,
+      RequestLocal(bufferSupplier)
+    )
+
+    if (groupError != Errors.NONE) {
+      future.completeExceptionally(groupError.exception)
+    } else {
+      val response = new OffsetDeleteResponseData()
+      topicPartitionResults.forKeyValue { (topicPartition, error) =>
+        var topic = response.topics.find(topicPartition.topic)

Review Comment:
   It was part of the protocol from the start. It basically means that when we deserialize, it builds a collection where the elements can be looked up via hashing. That's how we can use "find" here. (Not a new change, but I haven't seen it used often.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1065738658


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.RequestLocal
+import kafka.utils.Implicits.MapExtensionMethods
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.RequestContext
+import org.apache.kafka.common.utils.BufferSupplier
+
+import java.util.concurrent.CompletableFuture
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * GroupCoordinatorAdapter is a thin wrapper around kafka.coordinator.group.GroupCoordinator
+ * that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator interface.
+ */
+class GroupCoordinatorAdapter(
+  val coordinator: GroupCoordinator
+) extends org.apache.kafka.coordinator.group.GroupCoordinator {
+
+  override def joinGroup(

Review Comment:
   I have added javadocs in the interface. It does not seem necessary to duplicate it here. Regarding the code, it is not new in this class. It is mainly "copy-pasted" with a few minor adjustments from KafkaApis.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jeffkbkim commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1071515902


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java:
##########
@@ -44,6 +48,83 @@
  */
 public class OffsetDeleteResponse extends AbstractResponse {
 
+    public static class Builder {
+        OffsetDeleteResponseData data = new OffsetDeleteResponseData();
+
+        private OffsetDeleteResponseTopic getOrCreateTopic(
+            String topicName
+        ) {
+            OffsetDeleteResponseTopic topic = data.topics().find(topicName);
+            if (topic == null) {
+                topic = new OffsetDeleteResponseTopic().setName(topicName);
+                data.topics().add(topic);
+            }
+            return topic;
+        }
+
+        public Builder addPartition(
+            String topicName,
+            int partitionIndex,
+            Errors error
+        ) {
+            final OffsetDeleteResponseTopic topicResponse = getOrCreateTopic(topicName);
+
+            topicResponse.partitions().add(new OffsetDeleteResponsePartition()
+                .setPartitionIndex(partitionIndex)
+                .setErrorCode(error.code()));
+
+            return this;
+        }
+
+        public <P> Builder addPartitions(
+            String topicName,
+            List<P> partitions,
+            Function<P, Integer> partitionIndex,
+            Errors error
+        ) {
+            final OffsetDeleteResponseTopic topicResponse = getOrCreateTopic(topicName);
+
+            partitions.forEach(partition -> {
+                topicResponse.partitions().add(new OffsetDeleteResponsePartition()
+                    .setPartitionIndex(partitionIndex.apply(partition))
+                    .setErrorCode(error.code()));
+            });
+
+            return this;
+        }
+
+        public Builder merge(
+            OffsetDeleteResponseData newData
+        ) {
+            if (data.topics().isEmpty()) {
+                // If the current data is empty, we can discard it and use the new data.
+                data = newData;
+            } else {
+                // Otherwise, we have to merge them together.
+                newData.topics().forEach(newTopic -> {
+                    OffsetDeleteResponseTopic existingTopic = data.topics().find(newTopic.name());
+                    if (existingTopic == null) {
+                        // If no topic exists, we can directly copy the new topic data.
+                        data.topics().add(newTopic.duplicate());
+                    } else {
+                        // Otherwise, we add the partitions to the existing one. Note we
+                        // expect non-overlapping partitions here as we don't verify
+                        // if the partition is already in the list before adding it.
+                        newTopic.partitions().forEach(partition -> {
+                            existingTopic.partitions().add(partition.duplicate());

Review Comment:
   was wondering why we were duplicating and realized that we need to reset the element's prev and next values to insert



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1070186362


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -470,4 +470,45 @@ class GroupCoordinatorAdapter(
       expireTimestamp = expireTimestamp
     )
   }
+
+  override def deleteOffsets(
+    context: RequestContext,
+    request: OffsetDeleteRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetDeleteResponseData] = {
+    val future = new CompletableFuture[OffsetDeleteResponseData]()
+
+    val partitions = mutable.ArrayBuffer[TopicPartition]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        partitions += new TopicPartition(topic.name, partition.partitionIndex)
+      }
+    }
+
+    val (groupError, topicPartitionResults) = coordinator.handleDeleteOffsets(
+      request.groupId,
+      partitions,
+      RequestLocal(bufferSupplier)
+    )
+
+    if (groupError != Errors.NONE) {
+      future.completeExceptionally(groupError.exception)
+    } else {
+      val response = new OffsetDeleteResponseData()
+      topicPartitionResults.forKeyValue { (topicPartition, error) =>
+        var topic = response.topics.find(topicPartition.topic)

Review Comment:
   Map key is defined in the json for the request and response objects. Because it is define the message generator code gives this typing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac merged PR #12902:
URL: https://github.com/apache/kafka/pull/12902


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1036399725


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.RequestLocal
+import kafka.utils.Implicits.MapExtensionMethods
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.RequestContext
+import org.apache.kafka.common.utils.BufferSupplier
+
+import java.util.concurrent.CompletableFuture
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * GroupCoordinatorAdapter is a thin wrapper around kafka.coordinator.group.GroupCoordinator
+ * that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator interface.
+ */
+class GroupCoordinatorAdapter(
+  val coordinator: GroupCoordinator
+) extends org.apache.kafka.coordinator.group.GroupCoordinator {
+
+  override def joinGroup(

Review Comment:
   As we starting a fairly new piece of code, can we add JavaDocs for all methods and a little bit more comments in the code?



##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.RequestLocal
+import kafka.utils.Implicits.MapExtensionMethods
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.RequestContext
+import org.apache.kafka.common.utils.BufferSupplier
+
+import java.util.concurrent.CompletableFuture
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * GroupCoordinatorAdapter is a thin wrapper around kafka.coordinator.group.GroupCoordinator
+ * that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator interface.
+ */
+class GroupCoordinatorAdapter(
+  val coordinator: GroupCoordinator
+) extends org.apache.kafka.coordinator.group.GroupCoordinator {
+
+  override def joinGroup(
+    context: RequestContext,
+    request: JoinGroupRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[JoinGroupResponseData] = {
+    val future = new CompletableFuture[JoinGroupResponseData]()
+
+    def callback(joinResult: JoinGroupResult): Unit = {
+      future.complete(new JoinGroupResponseData()
+        .setErrorCode(joinResult.error.code)
+        .setGenerationId(joinResult.generationId)
+        .setProtocolType(joinResult.protocolType.orNull)
+        .setProtocolName(joinResult.protocolName.orNull)
+        .setLeader(joinResult.leaderId)
+        .setSkipAssignment(joinResult.skipAssignment)
+        .setMemberId(joinResult.memberId)
+        .setMembers(joinResult.members.asJava)
+      )
+    }
+
+    val groupInstanceId = Option(request.groupInstanceId)
+
+    // Only return MEMBER_ID_REQUIRED error if joinGroupRequest version is >= 4
+    // and groupInstanceId is configured to unknown.
+    val requireKnownMemberId = context.apiVersion >= 4 && groupInstanceId.isEmpty
+
+    val protocols = request.protocols.valuesList.asScala.map { protocol =>
+      (protocol.name, protocol.metadata)
+    }.toList
+
+    val supportSkippingAssignment = context.apiVersion >= 9
+
+    coordinator.handleJoinGroup(
+      request.groupId,
+      request.memberId,
+      groupInstanceId,
+      requireKnownMemberId,
+      supportSkippingAssignment,
+      context.clientId,
+      context.clientAddress.toString,
+      request.rebalanceTimeoutMs,
+      request.sessionTimeoutMs,
+      request.protocolType,
+      protocols,
+      callback,
+      Option(request.reason),
+      RequestLocal(bufferSupplier)
+    )
+
+    future
+  }
+
+  override def deleteOffsets(
+    context: RequestContext,
+    request: OffsetDeleteRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetDeleteResponseData] = {
+    val future = new CompletableFuture[OffsetDeleteResponseData]()
+
+    val partitions = mutable.ArrayBuffer[TopicPartition]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        partitions += new TopicPartition(topic.name, partition.partitionIndex)
+      }
+    }
+
+    val (groupError, topicPartitionResults) = coordinator.handleDeleteOffsets(
+      request.groupId,
+      partitions,
+      RequestLocal(bufferSupplier)
+    )
+
+    if (groupError != Errors.NONE) {
+      future.completeExceptionally(groupError.exception)
+    } else {
+      val response = new OffsetDeleteResponseData()
+      topicPartitionResults.forKeyValue { (topicPartition, error) =>
+        var topic = response.topics.find(topicPartition.topic)
+        if (topic == null) {
+          topic = new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setName(topicPartition.topic)
+          response.topics.add(topic)
+        }
+        topic.partitions.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+          .setPartitionIndex(topicPartition.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    future

Review Comment:
   Looks like we're moving to a different async model, is there a design doc to read about this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12902:
URL: https://github.com/apache/kafka/pull/12902#issuecomment-1381512820

   cc @jolshan @jeffkbkim 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on PR #12902:
URL: https://github.com/apache/kafka/pull/12902#issuecomment-1384325991

   @jolshan @jeffkbkim This one is ready for the last round.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] artemlivshits commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1067448086


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.RequestLocal
+import kafka.utils.Implicits.MapExtensionMethods
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.RequestContext
+import org.apache.kafka.common.utils.BufferSupplier
+
+import java.util.concurrent.CompletableFuture
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * GroupCoordinatorAdapter is a thin wrapper around kafka.coordinator.group.GroupCoordinator
+ * that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator interface.
+ */
+class GroupCoordinatorAdapter(
+  val coordinator: GroupCoordinator
+) extends org.apache.kafka.coordinator.group.GroupCoordinator {
+
+  override def joinGroup(
+    context: RequestContext,
+    request: JoinGroupRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[JoinGroupResponseData] = {
+    val future = new CompletableFuture[JoinGroupResponseData]()
+
+    def callback(joinResult: JoinGroupResult): Unit = {
+      future.complete(new JoinGroupResponseData()
+        .setErrorCode(joinResult.error.code)
+        .setGenerationId(joinResult.generationId)
+        .setProtocolType(joinResult.protocolType.orNull)
+        .setProtocolName(joinResult.protocolName.orNull)
+        .setLeader(joinResult.leaderId)
+        .setSkipAssignment(joinResult.skipAssignment)
+        .setMemberId(joinResult.memberId)
+        .setMembers(joinResult.members.asJava)
+      )
+    }
+
+    val groupInstanceId = Option(request.groupInstanceId)
+
+    // Only return MEMBER_ID_REQUIRED error if joinGroupRequest version is >= 4
+    // and groupInstanceId is configured to unknown.
+    val requireKnownMemberId = context.apiVersion >= 4 && groupInstanceId.isEmpty
+
+    val protocols = request.protocols.valuesList.asScala.map { protocol =>
+      (protocol.name, protocol.metadata)
+    }.toList
+
+    val supportSkippingAssignment = context.apiVersion >= 9
+
+    coordinator.handleJoinGroup(
+      request.groupId,
+      request.memberId,
+      groupInstanceId,
+      requireKnownMemberId,
+      supportSkippingAssignment,
+      context.clientId,
+      context.clientAddress.toString,
+      request.rebalanceTimeoutMs,
+      request.sessionTimeoutMs,
+      request.protocolType,
+      protocols,
+      callback,
+      Option(request.reason),
+      RequestLocal(bufferSupplier)
+    )
+
+    future
+  }
+
+  override def deleteOffsets(
+    context: RequestContext,
+    request: OffsetDeleteRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetDeleteResponseData] = {
+    val future = new CompletableFuture[OffsetDeleteResponseData]()
+
+    val partitions = mutable.ArrayBuffer[TopicPartition]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        partitions += new TopicPartition(topic.name, partition.partitionIndex)
+      }
+    }
+
+    val (groupError, topicPartitionResults) = coordinator.handleDeleteOffsets(
+      request.groupId,
+      partitions,
+      RequestLocal(bufferSupplier)
+    )
+
+    if (groupError != Errors.NONE) {
+      future.completeExceptionally(groupError.exception)
+    } else {
+      val response = new OffsetDeleteResponseData()
+      topicPartitionResults.forKeyValue { (topicPartition, error) =>
+        var topic = response.topics.find(topicPartition.topic)
+        if (topic == null) {
+          topic = new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setName(topicPartition.topic)
+          response.topics.add(topic)
+        }
+        topic.partitions.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+          .setPartitionIndex(topicPartition.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    future

Review Comment:
   Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1065739439


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.RequestLocal
+import kafka.utils.Implicits.MapExtensionMethods
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.RequestContext
+import org.apache.kafka.common.utils.BufferSupplier
+
+import java.util.concurrent.CompletableFuture
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * GroupCoordinatorAdapter is a thin wrapper around kafka.coordinator.group.GroupCoordinator
+ * that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator interface.
+ */
+class GroupCoordinatorAdapter(
+  val coordinator: GroupCoordinator
+) extends org.apache.kafka.coordinator.group.GroupCoordinator {
+
+  override def joinGroup(
+    context: RequestContext,
+    request: JoinGroupRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[JoinGroupResponseData] = {
+    val future = new CompletableFuture[JoinGroupResponseData]()
+
+    def callback(joinResult: JoinGroupResult): Unit = {
+      future.complete(new JoinGroupResponseData()
+        .setErrorCode(joinResult.error.code)
+        .setGenerationId(joinResult.generationId)
+        .setProtocolType(joinResult.protocolType.orNull)
+        .setProtocolName(joinResult.protocolName.orNull)
+        .setLeader(joinResult.leaderId)
+        .setSkipAssignment(joinResult.skipAssignment)
+        .setMemberId(joinResult.memberId)
+        .setMembers(joinResult.members.asJava)
+      )
+    }
+
+    val groupInstanceId = Option(request.groupInstanceId)
+
+    // Only return MEMBER_ID_REQUIRED error if joinGroupRequest version is >= 4
+    // and groupInstanceId is configured to unknown.
+    val requireKnownMemberId = context.apiVersion >= 4 && groupInstanceId.isEmpty
+
+    val protocols = request.protocols.valuesList.asScala.map { protocol =>
+      (protocol.name, protocol.metadata)
+    }.toList
+
+    val supportSkippingAssignment = context.apiVersion >= 9
+
+    coordinator.handleJoinGroup(
+      request.groupId,
+      request.memberId,
+      groupInstanceId,
+      requireKnownMemberId,
+      supportSkippingAssignment,
+      context.clientId,
+      context.clientAddress.toString,
+      request.rebalanceTimeoutMs,
+      request.sessionTimeoutMs,
+      request.protocolType,
+      protocols,
+      callback,
+      Option(request.reason),
+      RequestLocal(bufferSupplier)
+    )
+
+    future
+  }
+
+  override def deleteOffsets(
+    context: RequestContext,
+    request: OffsetDeleteRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetDeleteResponseData] = {
+    val future = new CompletableFuture[OffsetDeleteResponseData]()
+
+    val partitions = mutable.ArrayBuffer[TopicPartition]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        partitions += new TopicPartition(topic.name, partition.partitionIndex)
+      }
+    }
+
+    val (groupError, topicPartitionResults) = coordinator.handleDeleteOffsets(
+      request.groupId,
+      partitions,
+      RequestLocal(bufferSupplier)
+    )
+
+    if (groupError != Errors.NONE) {
+      future.completeExceptionally(groupError.exception)
+    } else {
+      val response = new OffsetDeleteResponseData()
+      topicPartitionResults.forKeyValue { (topicPartition, error) =>
+        var topic = response.topics.find(topicPartition.topic)
+        if (topic == null) {
+          topic = new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setName(topicPartition.topic)
+          response.topics.add(topic)
+        }
+        topic.partitions.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+          .setPartitionIndex(topicPartition.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    future

Review Comment:
   KIP-848 is the source of truth. The KIP explains that we want to move to a event loop based group coordinator. This is why all the methods are using futures.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1071450200


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -470,4 +470,45 @@ class GroupCoordinatorAdapter(
       expireTimestamp = expireTimestamp
     )
   }
+
+  override def deleteOffsets(
+    context: RequestContext,
+    request: OffsetDeleteRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetDeleteResponseData] = {
+    val future = new CompletableFuture[OffsetDeleteResponseData]()
+
+    val partitions = mutable.ArrayBuffer[TopicPartition]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        partitions += new TopicPartition(topic.name, partition.partitionIndex)
+      }
+    }
+
+    val (groupError, topicPartitionResults) = coordinator.handleDeleteOffsets(
+      request.groupId,
+      partitions,
+      RequestLocal(bufferSupplier)
+    )
+
+    if (groupError != Errors.NONE) {
+      future.completeExceptionally(groupError.exception)
+    } else {
+      val response = new OffsetDeleteResponseData()
+      topicPartitionResults.forKeyValue { (topicPartition, error) =>
+        var topic = response.topics.find(topicPartition.topic)

Review Comment:
   It is almost the same. The main difference is that we don't need another Map to lookup topics by their name because the ImplicitLinkedHashMultiCollection gives us this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1071531038


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java:
##########
@@ -44,6 +48,83 @@
  */
 public class OffsetDeleteResponse extends AbstractResponse {
 
+    public static class Builder {
+        OffsetDeleteResponseData data = new OffsetDeleteResponseData();
+
+        private OffsetDeleteResponseTopic getOrCreateTopic(
+            String topicName
+        ) {
+            OffsetDeleteResponseTopic topic = data.topics().find(topicName);
+            if (topic == null) {
+                topic = new OffsetDeleteResponseTopic().setName(topicName);
+                data.topics().add(topic);
+            }
+            return topic;
+        }
+
+        public Builder addPartition(
+            String topicName,
+            int partitionIndex,
+            Errors error
+        ) {
+            final OffsetDeleteResponseTopic topicResponse = getOrCreateTopic(topicName);
+
+            topicResponse.partitions().add(new OffsetDeleteResponsePartition()
+                .setPartitionIndex(partitionIndex)
+                .setErrorCode(error.code()));
+
+            return this;
+        }
+
+        public <P> Builder addPartitions(
+            String topicName,
+            List<P> partitions,
+            Function<P, Integer> partitionIndex,
+            Errors error
+        ) {
+            final OffsetDeleteResponseTopic topicResponse = getOrCreateTopic(topicName);
+
+            partitions.forEach(partition -> {
+                topicResponse.partitions().add(new OffsetDeleteResponsePartition()
+                    .setPartitionIndex(partitionIndex.apply(partition))
+                    .setErrorCode(error.code()));
+            });
+
+            return this;
+        }
+
+        public Builder merge(
+            OffsetDeleteResponseData newData
+        ) {
+            if (data.topics().isEmpty()) {
+                // If the current data is empty, we can discard it and use the new data.
+                data = newData;
+            } else {
+                // Otherwise, we have to merge them together.
+                newData.topics().forEach(newTopic -> {
+                    OffsetDeleteResponseTopic existingTopic = data.topics().find(newTopic.name());
+                    if (existingTopic == null) {
+                        // If no topic exists, we can directly copy the new topic data.
+                        data.topics().add(newTopic.duplicate());
+                    } else {
+                        // Otherwise, we add the partitions to the existing one. Note we
+                        // expect non-overlapping partitions here as we don't verify
+                        // if the partition is already in the list before adding it.
+                        newTopic.partitions().forEach(partition -> {
+                            existingTopic.partitions().add(partition.duplicate());

Review Comment:
   that’s right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1069791363


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -470,4 +470,45 @@ class GroupCoordinatorAdapter(
       expireTimestamp = expireTimestamp
     )
   }
+
+  override def deleteOffsets(
+    context: RequestContext,
+    request: OffsetDeleteRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetDeleteResponseData] = {
+    val future = new CompletableFuture[OffsetDeleteResponseData]()
+
+    val partitions = mutable.ArrayBuffer[TopicPartition]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        partitions += new TopicPartition(topic.name, partition.partitionIndex)
+      }
+    }
+
+    val (groupError, topicPartitionResults) = coordinator.handleDeleteOffsets(
+      request.groupId,
+      partitions,
+      RequestLocal(bufferSupplier)
+    )
+
+    if (groupError != Errors.NONE) {
+      future.completeExceptionally(groupError.exception)
+    } else {
+      val response = new OffsetDeleteResponseData()
+      topicPartitionResults.forKeyValue { (topicPartition, error) =>
+        var topic = response.topics.find(topicPartition.topic)

Review Comment:
   TIL that topics in this response are ImplicitLinkedHashMultiCollection.Elements. 😅 
   I'm curious why this specific response gets this typing compared to similar request/response jsons. But good to know that this find operation works and is fairly efficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on PR #12902:
URL: https://github.com/apache/kafka/pull/12902#issuecomment-1382451476

   Things are starting to fall into a pretty common pattern here. :) Looks pretty good, but I'll give Jeff a chance to take a look as well. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1070928680


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -470,4 +470,45 @@ class GroupCoordinatorAdapter(
       expireTimestamp = expireTimestamp
     )
   }
+
+  override def deleteOffsets(
+    context: RequestContext,
+    request: OffsetDeleteRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetDeleteResponseData] = {
+    val future = new CompletableFuture[OffsetDeleteResponseData]()
+
+    val partitions = mutable.ArrayBuffer[TopicPartition]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        partitions += new TopicPartition(topic.name, partition.partitionIndex)
+      }
+    }
+
+    val (groupError, topicPartitionResults) = coordinator.handleDeleteOffsets(
+      request.groupId,
+      partitions,
+      RequestLocal(bufferSupplier)
+    )
+
+    if (groupError != Errors.NONE) {
+      future.completeExceptionally(groupError.exception)
+    } else {
+      val response = new OffsetDeleteResponseData()
+      topicPartitionResults.forKeyValue { (topicPartition, error) =>
+        var topic = response.topics.find(topicPartition.topic)

Review Comment:
   Yeah, that's right. In the schema definition, you can specify that you want a map like data structure instead of a list. This is done by specifying the "mapKey" in the spec.
   
   As Justine said, this was done before this patch so I just kept it as-is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1069791363


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -470,4 +470,45 @@ class GroupCoordinatorAdapter(
       expireTimestamp = expireTimestamp
     )
   }
+
+  override def deleteOffsets(
+    context: RequestContext,
+    request: OffsetDeleteRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetDeleteResponseData] = {
+    val future = new CompletableFuture[OffsetDeleteResponseData]()
+
+    val partitions = mutable.ArrayBuffer[TopicPartition]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        partitions += new TopicPartition(topic.name, partition.partitionIndex)
+      }
+    }
+
+    val (groupError, topicPartitionResults) = coordinator.handleDeleteOffsets(
+      request.groupId,
+      partitions,
+      RequestLocal(bufferSupplier)
+    )
+
+    if (groupError != Errors.NONE) {
+      future.completeExceptionally(groupError.exception)
+    } else {
+      val response = new OffsetDeleteResponseData()
+      topicPartitionResults.forKeyValue { (topicPartition, error) =>
+        var topic = response.topics.find(topicPartition.topic)

Review Comment:
   TIL that topics in this response are ImplicitLinkedHashMultiCollection.Elements. 😅 
   I'm curious why this specific response gets this typing compared to similar request/response jsons -- I guess its maybe the "map key". But good to know that this find operation works and is fairly efficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jeffkbkim commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1070170859


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2764,37 +2763,170 @@ class KafkaApisTest {
     val request = buildRequest(offsetDeleteRequest)
 
     val requestLocal = RequestLocal.withThreadConfinedCaching
-    when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-      any[Long])).thenReturn(0)
-    when(groupCoordinator.handleDeleteOffsets(
-      ArgumentMatchers.eq(group),
-      ArgumentMatchers.eq(Seq(
-        new TopicPartition("topic-1", 0),
-        new TopicPartition("topic-1", 1),
-        new TopicPartition("topic-2", 0),
-        new TopicPartition("topic-2", 1)
-      )),
-      ArgumentMatchers.eq(requestLocal)
-    )).thenReturn((Errors.NONE, Map(
-      new TopicPartition("topic-1", 0) -> Errors.NONE,
-      new TopicPartition("topic-1", 1) -> Errors.NONE,
-      new TopicPartition("topic-2", 0) -> Errors.NONE,
-      new TopicPartition("topic-2", 1) -> Errors.NONE,
-    )))
+    val future = new CompletableFuture[OffsetDeleteResponseData]()
+    when(newGroupCoordinator.deleteOffsets(
+      request.context,
+      offsetDeleteRequest.data,
+      requestLocal.bufferSupplier
+    )).thenReturn(future)
 
     createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
 
+    val offsetDeleteResponseData = new OffsetDeleteResponseData()
+      .setTopics(new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(List(
+        new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+          .setName("topic-1")

Review Comment:
   shouldn't we expect 
   topic-1 (partition-0, partition-1)
   topic-2 (partition-0, partition-1)
   
   in the response? why are we expecting
   topic-1 (partition-0, partition-0)?



##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -470,4 +470,45 @@ class GroupCoordinatorAdapter(
       expireTimestamp = expireTimestamp
     )
   }
+
+  override def deleteOffsets(
+    context: RequestContext,
+    request: OffsetDeleteRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetDeleteResponseData] = {
+    val future = new CompletableFuture[OffsetDeleteResponseData]()
+
+    val partitions = mutable.ArrayBuffer[TopicPartition]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        partitions += new TopicPartition(topic.name, partition.partitionIndex)
+      }
+    }
+
+    val (groupError, topicPartitionResults) = coordinator.handleDeleteOffsets(
+      request.groupId,
+      partitions,
+      RequestLocal(bufferSupplier)
+    )
+
+    if (groupError != Errors.NONE) {
+      future.completeExceptionally(groupError.exception)
+    } else {
+      val response = new OffsetDeleteResponseData()
+      topicPartitionResults.forKeyValue { (topicPartition, error) =>
+        var topic = response.topics.find(topicPartition.topic)

Review Comment:
   i'm still confused on the reason behind using ImplicitLinkedHashMultiCollection - what do you mean by map key? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1070097639


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2817,14 +2949,18 @@ class KafkaApisTest {
           .setTopics(topics)
       ).build()
       val request = buildRequest(offsetDeleteRequest)
-      when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-        any[Long])).thenReturn(0)
 
-      val requestLocal = RequestLocal.withThreadConfinedCaching
-      when(groupCoordinator.handleDeleteOffsets(ArgumentMatchers.eq(group), ArgumentMatchers.eq(Seq.empty),
-        ArgumentMatchers.eq(requestLocal))).thenReturn((Errors.NONE, Map.empty[TopicPartition, Errors]))
+      // The group coordinator is called even if there are no
+      // topic-partitions left after the validation.
+      when(newGroupCoordinator.deleteOffsets(
+        request.context,
+        new OffsetDeleteRequestData().setGroupId(group),
+        RequestLocal.NoCaching.bufferSupplier
+      )).thenReturn(CompletableFuture.completedFuture(
+        new OffsetDeleteResponseData()
+      ))
 
-      createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
+      createKafkaApis().handleOffsetDeleteRequest(request, RequestLocal.NoCaching)

Review Comment:
   Was this changed for correctness or just to make things simpler?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1070115003


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2817,14 +2949,18 @@ class KafkaApisTest {
           .setTopics(topics)
       ).build()
       val request = buildRequest(offsetDeleteRequest)
-      when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-        any[Long])).thenReturn(0)
 
-      val requestLocal = RequestLocal.withThreadConfinedCaching
-      when(groupCoordinator.handleDeleteOffsets(ArgumentMatchers.eq(group), ArgumentMatchers.eq(Seq.empty),
-        ArgumentMatchers.eq(requestLocal))).thenReturn((Errors.NONE, Map.empty[TopicPartition, Errors]))
+      // The group coordinator is called even if there are no
+      // topic-partitions left after the validation.
+      when(newGroupCoordinator.deleteOffsets(
+        request.context,
+        new OffsetDeleteRequestData().setGroupId(group),
+        RequestLocal.NoCaching.bufferSupplier
+      )).thenReturn(CompletableFuture.completedFuture(
+        new OffsetDeleteResponseData()
+      ))
 
-      createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
+      createKafkaApis().handleOffsetDeleteRequest(request, RequestLocal.NoCaching)

Review Comment:
   Just to make things simpler. The test basically verified that whatever is passed here is received by the group coordinator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] CalvinConfluent commented on a diff in pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
CalvinConfluent commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1067573213


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3110,61 +3110,69 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleOffsetDeleteRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
+  def handleOffsetDeleteRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
     val offsetDeleteRequest = request.body[OffsetDeleteRequest]
-    val groupId = offsetDeleteRequest.data.groupId
 
-    if (authHelper.authorize(request.context, DELETE, GROUP, groupId)) {
-      val topics = offsetDeleteRequest.data.topics.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
+    if (!authHelper.authorize(request.context, DELETE, GROUP, offsetDeleteRequest.data.groupId)) {
+      requestHelper.sendMaybeThrottle(request, offsetDeleteRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else {
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        offsetDeleteRequest.data.topics.asScala
+      )(_.name)
+
+      val responseBuilder = new OffsetDeleteResponse.Builder
+      val authorizedTopicPartitions = new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection()
+      offsetDeleteRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          responseBuilder.addPartitions[OffsetDeleteRequestData.OffsetDeleteRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          responseBuilder.addPartitions[OffsetDeleteRequestData.OffsetDeleteRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          val topicWithValidPartitions = new OffsetDeleteRequestData.OffsetDeleteRequestTopic().setName(topic.name)
 
-      val topicPartitionErrors = mutable.Map[TopicPartition, Errors]()
-      val topicPartitions = mutable.ArrayBuffer[TopicPartition]()
+          topic.partitions.forEach { partition =>

Review Comment:
   In the old code, we seem not to check the existence of all the partitions. Is it an extra safety check here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org