You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/07 01:05:44 UTC

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

jeffkbkim commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1063877209


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java:
##########
@@ -208,6 +203,56 @@ public OffsetFetchResponse(int throttleTimeMs,
         this.error = null;
     }
 
+    public OffsetFetchResponse(List<OffsetFetchResponseGroup> groups, short version) {
+        super(ApiKeys.OFFSET_FETCH);
+        data = new OffsetFetchResponseData();
+
+        if (version >= 8) {
+            data.setGroups(groups);
+            error = null;
+
+            for (OffsetFetchResponseGroup group : data.groups()) {
+                this.groupLevelErrors.put(group.groupId(), Errors.forCode(group.errorCode()));
+            }
+        } else {
+            if (groups.size() != 1) {
+                throw new UnsupportedVersionException(
+                    "Version " + version + " of OffsetFetchResponse only support one group."

Review Comment:
   nit: supports



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java:
##########
@@ -208,6 +203,56 @@ public OffsetFetchResponse(int throttleTimeMs,
         this.error = null;
     }
 
+    public OffsetFetchResponse(List<OffsetFetchResponseGroup> groups, short version) {
+        super(ApiKeys.OFFSET_FETCH);
+        data = new OffsetFetchResponseData();
+
+        if (version >= 8) {
+            data.setGroups(groups);
+            error = null;
+
+            for (OffsetFetchResponseGroup group : data.groups()) {
+                this.groupLevelErrors.put(group.groupId(), Errors.forCode(group.errorCode()));
+            }
+        } else {
+            if (groups.size() != 1) {
+                throw new UnsupportedVersionException(
+                    "Version " + version + " of OffsetFetchResponse only support one group."
+                );
+            }
+
+            OffsetFetchResponseGroup group = groups.get(0);
+            data.setErrorCode(group.errorCode());
+            error = Errors.forCode(group.errorCode());
+
+            group.topics().forEach(topic -> {
+                OffsetFetchResponseTopic newTopic = new OffsetFetchResponseTopic().setName(topic.name());
+                data.topics().add(newTopic);
+
+                topic.partitions().forEach(partition -> {
+                    OffsetFetchResponsePartition newPartition;
+
+                    if (version < 2 && group.errorCode() != Errors.NONE.code()) {
+                        // Versions prior to version 2 does not support a top level error. Therefore

Review Comment:
   nit: do not & Therefore,



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1395,79 +1389,123 @@ class KafkaApis(val requestChannel: RequestChannel,
       offsetFetchResponse
     }
     requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    CompletableFuture.completedFuture[Unit](())
   }
 
-  private def handleOffsetFetchRequestBetweenV1AndV7(request: RequestChannel.Request): Unit = {
-    val header = request.header
+  private def handleOffsetFetchRequestFromCoordinator(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val offsetFetchRequest = request.body[OffsetFetchRequest]
-    val groupId = offsetFetchRequest.groupId()
-    val (error, partitionData) = fetchOffsets(groupId, offsetFetchRequest.isAllPartitions,
-      offsetFetchRequest.requireStable, offsetFetchRequest.partitions, request.context)
-    def createResponse(requestThrottleMs: Int): AbstractResponse = {
-      val offsetFetchResponse =
-        if (error != Errors.NONE) {
-          offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
-        } else {
-          new OffsetFetchResponse(requestThrottleMs, Errors.NONE, partitionData.asJava)
-        }
-      trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
-      offsetFetchResponse
+    val groups = offsetFetchRequest.groups()
+    val requireStable = offsetFetchRequest.requireStable()
+
+    val futures = new mutable.ArrayBuffer[CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]](groups.size)
+    groups.forEach { groupOffsetFetch =>
+      val isAllPartitions = groupOffsetFetch.topics == null
+      val future = if (isAllPartitions) {
+        fetchAllOffsets(
+          request.context,
+          groupOffsetFetch,
+          requireStable
+        )
+      } else {
+        fetchOffsets(
+          request.context,
+          groupOffsetFetch,
+          requireStable
+        )
+      }
+      futures += future
     }
-    requestHelper.sendResponseMaybeThrottle(request, createResponse)
-  }
-
-  private def handleOffsetFetchRequestV8AndAbove(request: RequestChannel.Request): Unit = {
-    val header = request.header
-    val offsetFetchRequest = request.body[OffsetFetchRequest]
-    val groupIds = offsetFetchRequest.groupIds().asScala
-    val groupToErrorMap =  mutable.Map.empty[String, Errors]
-    val groupToPartitionData =  mutable.Map.empty[String, util.Map[TopicPartition, PartitionData]]
-    val groupToTopicPartitions = offsetFetchRequest.groupIdsToPartitions()
-    groupIds.foreach(g => {
-      val (error, partitionData) = fetchOffsets(g,
-        offsetFetchRequest.isAllPartitionsForGroup(g),
-        offsetFetchRequest.requireStable(),
-        groupToTopicPartitions.get(g), request.context)
-      groupToErrorMap += (g -> error)
-      groupToPartitionData += (g -> partitionData.asJava)
-    })
 
-    def createResponse(requestThrottleMs: Int): AbstractResponse = {
-      val offsetFetchResponse = new OffsetFetchResponse(requestThrottleMs,
-        groupToErrorMap.asJava, groupToPartitionData.asJava)
-      trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
-      offsetFetchResponse
+    CompletableFuture.allOf(futures.toArray: _*).handle[Unit] { (_, _) =>
+      val groupResponses = new ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseGroup](futures.size)
+      futures.foreach(future => groupResponses += future.get())
+      requestHelper.sendMaybeThrottle(request, new OffsetFetchResponse(groupResponses.asJava, request.context.apiVersion))
     }
-
-    requestHelper.sendResponseMaybeThrottle(request, createResponse)
   }
 
-  private def fetchOffsets(groupId: String, isAllPartitions: Boolean, requireStable: Boolean,
-                           partitions: util.List[TopicPartition], context: RequestContext): (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
-    if (!authHelper.authorize(context, DESCRIBE, GROUP, groupId)) {
-      (Errors.GROUP_AUTHORIZATION_FAILED, Map.empty)
-    } else {
-      if (isAllPartitions) {
-        val (error, allPartitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable)
-        if (error != Errors.NONE) {
-          (error, allPartitionData)
-        } else {
-          // clients are not allowed to see offsets for topics that are not authorized for Describe
-          val (authorizedPartitionData, _) = authHelper.partitionMapByAuthorized(context,
-            DESCRIBE, TOPIC, allPartitionData)(_.topic)
-          (Errors.NONE, authorizedPartitionData)
-        }
+  private def fetchAllOffsets(

Review Comment:
   fetchAllOffsetsForGroup and fetchOffsetsForGroup makes it more readable for me. wdyt?
   
   looking at the new GroupCoordinator interface seems like they have a counterpart method. thoughts on changing both?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1395,79 +1389,123 @@ class KafkaApis(val requestChannel: RequestChannel,
       offsetFetchResponse
     }
     requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    CompletableFuture.completedFuture[Unit](())
   }
 
-  private def handleOffsetFetchRequestBetweenV1AndV7(request: RequestChannel.Request): Unit = {
-    val header = request.header
+  private def handleOffsetFetchRequestFromCoordinator(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val offsetFetchRequest = request.body[OffsetFetchRequest]
-    val groupId = offsetFetchRequest.groupId()
-    val (error, partitionData) = fetchOffsets(groupId, offsetFetchRequest.isAllPartitions,
-      offsetFetchRequest.requireStable, offsetFetchRequest.partitions, request.context)
-    def createResponse(requestThrottleMs: Int): AbstractResponse = {
-      val offsetFetchResponse =
-        if (error != Errors.NONE) {
-          offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
-        } else {
-          new OffsetFetchResponse(requestThrottleMs, Errors.NONE, partitionData.asJava)
-        }
-      trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
-      offsetFetchResponse
+    val groups = offsetFetchRequest.groups()
+    val requireStable = offsetFetchRequest.requireStable()
+
+    val futures = new mutable.ArrayBuffer[CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]](groups.size)
+    groups.forEach { groupOffsetFetch =>
+      val isAllPartitions = groupOffsetFetch.topics == null
+      val future = if (isAllPartitions) {
+        fetchAllOffsets(
+          request.context,
+          groupOffsetFetch,
+          requireStable
+        )
+      } else {
+        fetchOffsets(
+          request.context,
+          groupOffsetFetch,
+          requireStable
+        )
+      }
+      futures += future
     }
-    requestHelper.sendResponseMaybeThrottle(request, createResponse)
-  }
-
-  private def handleOffsetFetchRequestV8AndAbove(request: RequestChannel.Request): Unit = {
-    val header = request.header
-    val offsetFetchRequest = request.body[OffsetFetchRequest]
-    val groupIds = offsetFetchRequest.groupIds().asScala
-    val groupToErrorMap =  mutable.Map.empty[String, Errors]
-    val groupToPartitionData =  mutable.Map.empty[String, util.Map[TopicPartition, PartitionData]]
-    val groupToTopicPartitions = offsetFetchRequest.groupIdsToPartitions()
-    groupIds.foreach(g => {
-      val (error, partitionData) = fetchOffsets(g,
-        offsetFetchRequest.isAllPartitionsForGroup(g),
-        offsetFetchRequest.requireStable(),
-        groupToTopicPartitions.get(g), request.context)
-      groupToErrorMap += (g -> error)
-      groupToPartitionData += (g -> partitionData.asJava)
-    })
 
-    def createResponse(requestThrottleMs: Int): AbstractResponse = {
-      val offsetFetchResponse = new OffsetFetchResponse(requestThrottleMs,
-        groupToErrorMap.asJava, groupToPartitionData.asJava)
-      trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
-      offsetFetchResponse
+    CompletableFuture.allOf(futures.toArray: _*).handle[Unit] { (_, _) =>
+      val groupResponses = new ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseGroup](futures.size)
+      futures.foreach(future => groupResponses += future.get())
+      requestHelper.sendMaybeThrottle(request, new OffsetFetchResponse(groupResponses.asJava, request.context.apiVersion))
     }
-
-    requestHelper.sendResponseMaybeThrottle(request, createResponse)
   }
 
-  private def fetchOffsets(groupId: String, isAllPartitions: Boolean, requireStable: Boolean,
-                           partitions: util.List[TopicPartition], context: RequestContext): (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
-    if (!authHelper.authorize(context, DESCRIBE, GROUP, groupId)) {
-      (Errors.GROUP_AUTHORIZATION_FAILED, Map.empty)
-    } else {
-      if (isAllPartitions) {
-        val (error, allPartitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable)
-        if (error != Errors.NONE) {
-          (error, allPartitionData)
-        } else {
-          // clients are not allowed to see offsets for topics that are not authorized for Describe
-          val (authorizedPartitionData, _) = authHelper.partitionMapByAuthorized(context,
-            DESCRIBE, TOPIC, allPartitionData)(_.topic)
-          (Errors.NONE, authorizedPartitionData)
-        }
+  private def fetchAllOffsets(
+    requestContext: RequestContext,
+    groupOffsetFetch: OffsetFetchRequestData.OffsetFetchRequestGroup,
+    requireStable: Boolean
+  ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
+    if (!authHelper.authorize(requestContext, DESCRIBE, GROUP, groupOffsetFetch.groupId)) {
+      return CompletableFuture.completedFuture(new OffsetFetchResponseData.OffsetFetchResponseGroup()
+        .setGroupId(groupOffsetFetch.groupId)
+        .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code))
+    }

Review Comment:
   seems like this block is reused in `fetchOffsets()`, can we move it up to `handleOffsetFetchRequestFromCoordinator()`?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -3405,6 +3405,341 @@ class KafkaApisTest {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+  def testHandleOffsetFetchWithMultipleGroups(version: Short): Unit = {
+    // Version 0 gets offsets from Zookeeper. We are not interested
+    // in testing this here.
+    if (version == 0) return
+
+    def makeRequest(version: Short): RequestChannel.Request = {
+      val groups = Map(
+        "group-1" -> List(
+          new TopicPartition("foo", 0),
+          new TopicPartition("foo", 1)
+        ).asJava,
+        "group-2" -> null,
+        "group-3" -> null,
+      ).asJava
+      buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version))
+    }
+
+    if (version < 8) {
+      // Request version earlier than version 8 do not support batching.

Review Comment:
   nit: do not support batching groups?



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java:
##########
@@ -208,6 +203,56 @@ public OffsetFetchResponse(int throttleTimeMs,
         this.error = null;
     }
 
+    public OffsetFetchResponse(List<OffsetFetchResponseGroup> groups, short version) {
+        super(ApiKeys.OFFSET_FETCH);
+        data = new OffsetFetchResponseData();
+
+        if (version >= 8) {
+            data.setGroups(groups);
+            error = null;
+
+            for (OffsetFetchResponseGroup group : data.groups()) {
+                this.groupLevelErrors.put(group.groupId(), Errors.forCode(group.errorCode()));
+            }
+        } else {
+            if (groups.size() != 1) {
+                throw new UnsupportedVersionException(
+                    "Version " + version + " of OffsetFetchResponse only support one group."
+                );
+            }
+
+            OffsetFetchResponseGroup group = groups.get(0);
+            data.setErrorCode(group.errorCode());
+            error = Errors.forCode(group.errorCode());
+
+            group.topics().forEach(topic -> {
+                OffsetFetchResponseTopic newTopic = new OffsetFetchResponseTopic().setName(topic.name());
+                data.topics().add(newTopic);
+
+                topic.partitions().forEach(partition -> {
+                    OffsetFetchResponsePartition newPartition;
+
+                    if (version < 2 && group.errorCode() != Errors.NONE.code()) {
+                        // Versions prior to version 2 does not support a top level error. Therefore
+                        // we put it at the partition level.
+                        newPartition = new OffsetFetchResponsePartition()
+                            .setPartitionIndex(partition.partitionIndex())
+                            .setErrorCode(group.errorCode());
+                    } else {

Review Comment:
   i'm a bit confused on the else statement. we can hit this either when version >= 2 or group error == NONE. 
   
   based on the comment above, it seems that for version >= 2 we don't have to put error at the partition level. also, do we need to add offset/metadata if there is an error for version >= 2?



##########
core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala:
##########
@@ -150,42 +149,6 @@ class OffsetFetchRequestTest extends BaseRequestTest {
     }
   }
 
-  @Test
-  def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = {

Review Comment:
   +1 for having duplicate responses if a group is repeated. i don't think this warrants an invalid request but we should conform to the norm if there is one. 



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -3405,6 +3405,341 @@ class KafkaApisTest {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+  def testHandleOffsetFetchWithMultipleGroups(version: Short): Unit = {
+    // Version 0 gets offsets from Zookeeper. We are not interested
+    // in testing this here.
+    if (version == 0) return
+
+    def makeRequest(version: Short): RequestChannel.Request = {
+      val groups = Map(
+        "group-1" -> List(
+          new TopicPartition("foo", 0),
+          new TopicPartition("foo", 1)
+        ).asJava,
+        "group-2" -> null,
+        "group-3" -> null,
+      ).asJava
+      buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version))
+    }
+
+    if (version < 8) {
+      // Request version earlier than version 8 do not support batching.
+      assertThrows(classOf[UnsupportedVersionException], () => makeRequest(version))
+    } else {
+      val requestChannelRequest = makeRequest(version)
+
+      val group1Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+      when(newGroupCoordinator.fetchOffsets(
+        requestChannelRequest.context,
+        "group-1",
+        List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+          .setName("foo")
+          .setPartitionIndexes(List[Integer](0, 1).asJava)
+        ).asJava,
+        false
+      )).thenReturn(group1Future)
+
+      val group2Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+      when(newGroupCoordinator.fetchAllOffsets(
+        requestChannelRequest.context,
+        "group-2",
+        false
+      )).thenReturn(group2Future)
+
+      val group3Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+      when(newGroupCoordinator.fetchAllOffsets(
+        requestChannelRequest.context,
+        "group-3",
+        false
+      )).thenReturn(group3Future)
+
+      createKafkaApis().handleOffsetFetchRequest(requestChannelRequest)
+
+      val group1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
+        .setGroupId("group-1")
+        .setTopics(List(
+          new OffsetFetchResponseData.OffsetFetchResponseTopics()
+            .setName("foo")
+            .setPartitions(List(
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(0)
+                .setCommittedOffset(100)
+                .setCommittedLeaderEpoch(1),
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(1)
+                .setCommittedOffset(200)
+                .setCommittedLeaderEpoch(2)
+            ).asJava)
+        ).asJava)
+
+      val group2Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
+        .setGroupId("group-2")
+        .setTopics(List(
+          new OffsetFetchResponseData.OffsetFetchResponseTopics()
+            .setName("bar")
+            .setPartitions(List(
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(0)
+                .setCommittedOffset(100)
+                .setCommittedLeaderEpoch(1),
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(1)
+                .setCommittedOffset(200)
+                .setCommittedLeaderEpoch(2),
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(2)
+                .setCommittedOffset(300)
+                .setCommittedLeaderEpoch(3)
+            ).asJava)
+        ).asJava)
+
+      val group3Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
+        .setGroupId("group-3")
+        .setErrorCode(Errors.INVALID_GROUP_ID.code)
+
+      val expectedOffsetFetchResponse = new OffsetFetchResponseData()
+        .setGroups(List(group1Response, group2Response, group3Response).asJava)
+
+      group1Future.complete(group1Response.topics)
+      group2Future.complete(group2Response.topics)
+      group3Future.completeExceptionally(Errors.INVALID_GROUP_ID.exception)
+
+      val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
+      assertEquals(expectedOffsetFetchResponse, response.data)
+    }
+  }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+  def testHandleOffsetFetchWithSingleGroup(version: Short): Unit = {
+    // Version 0 gets offsets from Zookeeper. We are not interested
+    // in testing this here.
+    if (version == 0) return
+
+    def makeRequest(version: Short): RequestChannel.Request = {
+      buildRequest(new OffsetFetchRequest.Builder(
+        "group-1",
+        false,
+        List(
+          new TopicPartition("foo", 0),
+          new TopicPartition("foo", 1)
+        ).asJava,
+        false
+      ).build(version))
+    }
+
+    val requestChannelRequest = makeRequest(version)
+
+    val future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+    when(newGroupCoordinator.fetchOffsets(
+      requestChannelRequest.context,
+      "group-1",
+      List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+        .setName("foo")
+        .setPartitionIndexes(List[Integer](0, 1).asJava)
+      ).asJava,
+      false
+    )).thenReturn(future)
+
+    createKafkaApis().handleOffsetFetchRequest(requestChannelRequest)
+
+    val group1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
+      .setGroupId("group-1")
+      .setTopics(List(
+        new OffsetFetchResponseData.OffsetFetchResponseTopics()
+          .setName("foo")
+          .setPartitions(List(
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(0)
+              .setCommittedOffset(100)
+              .setCommittedLeaderEpoch(1),
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(1)
+              .setCommittedOffset(200)
+              .setCommittedLeaderEpoch(2)
+          ).asJava)
+      ).asJava)
+
+    val expectedOffsetFetchResponse = if (version >= 8) {
+      new OffsetFetchResponseData()
+        .setGroups(List(group1Response).asJava)
+    } else {
+      new OffsetFetchResponseData()
+        .setTopics(List(
+          new OffsetFetchResponseData.OffsetFetchResponseTopic()
+            .setName("foo")
+            .setPartitions(List(
+              new OffsetFetchResponseData.OffsetFetchResponsePartition()
+                .setPartitionIndex(0)
+                .setCommittedOffset(100)
+                .setCommittedLeaderEpoch(if (version >= 5) 1 else -1),
+              new OffsetFetchResponseData.OffsetFetchResponsePartition()
+                .setPartitionIndex(1)
+                .setCommittedOffset(200)
+                .setCommittedLeaderEpoch(if (version >= 5) 2 else -1)
+            ).asJava)
+        ).asJava)
+    }
+
+    future.complete(group1Response.topics)
+
+    val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
+    assertEquals(expectedOffsetFetchResponse, response.data)
+  }
+
+  @Test
+  def testHandleOffsetFetchAuthorization(): Unit = {
+    def makeRequest(version: Short): RequestChannel.Request = {
+      val groups = Map(
+        "group-1" -> List(
+          new TopicPartition("foo", 0),
+          new TopicPartition("bar", 0)
+        ).asJava,
+        "group-2" -> List(
+          new TopicPartition("foo", 0),
+          new TopicPartition("bar", 0)
+        ).asJava,
+        "group-3" -> null,
+        "group-4" -> null,
+      ).asJava
+      buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version))
+    }
+
+    val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion)
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+
+    val acls = Map(
+      "group-1" -> AuthorizationResult.ALLOWED,
+      "group-2" -> AuthorizationResult.DENIED,
+      "group-3" -> AuthorizationResult.ALLOWED,
+      "group-4" -> AuthorizationResult.DENIED,
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.ALLOWED
+    )
+
+    when(authorizer.authorize(
+      any[RequestContext],
+      any[util.List[Action]]
+    )).thenAnswer { invocation =>
+      val actions = invocation.getArgument(1, classOf[util.List[Action]])
+      actions.asScala.map { action =>
+        acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+      }.asJava
+    }
+
+    // group-1 is allowed and bar is allowed.
+    val group1Future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+    when(newGroupCoordinator.fetchOffsets(
+      requestChannelRequest.context,
+      "group-1",
+      List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+        .setName("bar")
+        .setPartitionIndexes(List[Integer](0).asJava)
+      ).asJava,
+      false
+    )).thenReturn(group1Future)
+
+    // group-3 is allows and bar is allowed.

Review Comment:
   nit: is allowed



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1338,27 +1337,22 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle an offset fetch request
    */
-  def handleOffsetFetchRequest(request: RequestChannel.Request): Unit = {
+  def handleOffsetFetchRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val version = request.header.apiVersion
     if (version == 0) {
-      // reading offsets from ZK
-      handleOffsetFetchRequestV0(request)
-    } else if (version >= 1 && version <= 7) {
-      // reading offsets from Kafka
-      handleOffsetFetchRequestBetweenV1AndV7(request)
+      handleOffsetFetchRequestFromZookeeper(request)

Review Comment:
   looks much more readable. nice!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org