You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/02/23 17:32:54 UTC

[kafka] branch trunk updated: KAFKA-14744; NPE while converting OffsetFetch from version < 8 to version >= 8 (#13295)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c55e5afb836 KAFKA-14744; NPE while converting OffsetFetch from version < 8 to version >= 8 (#13295)
c55e5afb836 is described below

commit c55e5afb836e7f1fed749b9d02a73bc83fdf52fd
Author: David Jacot <dj...@confluent.io>
AuthorDate: Thu Feb 23 18:32:33 2023 +0100

    KAFKA-14744; NPE while converting OffsetFetch from version < 8 to version >= 8 (#13295)
    
    While refactoring the OffsetFetch handling in KafkaApis, we introduced a NullPointerException (NPE). The NPE arises when the FetchOffset API is called with a client using a version older than version 8 and using null for the topics to signal that all topic-partition offsets must be returned. This means that this bug mainly impacts admin tools. The consumer does not use null.
    
    This NPE is here: https://github.com/apache/kafka/commit/24a86423e9907b751d98fddc7196332feea2b48d#diff-0f2f19fd03e2fc5aa9618c607b432ea72e5aaa53866f07444269f38cb537f3feR237.
    
    We missed this during the refactor because we had no tests in place to test this mode.
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Justine Olshan <jo...@confluent.io>
---
 .../kafka/common/requests/OffsetFetchRequest.java  | 24 +++++---
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 71 ++++++++++++++++++++++
 .../unit/kafka/server/OffsetFetchRequestTest.scala | 52 +++++++++++++++-
 3 files changed, 138 insertions(+), 9 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index edcad541dca..1321612d30a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -232,14 +232,22 @@ public class OffsetFetchRequest extends AbstractRequest {
             return data.groups();
         } else {
             OffsetFetchRequestData.OffsetFetchRequestGroup group =
-                new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(data.groupId());
-
-            data.topics().forEach(topic -> {
-                group.topics().add(new OffsetFetchRequestTopics()
-                    .setName(topic.name())
-                    .setPartitionIndexes(topic.partitionIndexes())
-                );
-            });
+                new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                    .setGroupId(data.groupId());
+
+            if (data.topics() == null) {
+                // If topics is null, it means that all topic-partitions should
+                // be fetched hence we preserve it.
+                group.setTopics(null);
+            } else {
+                // Otherwise, topics are translated to the new structure.
+                data.topics().forEach(topic -> {
+                    group.topics().add(new OffsetFetchRequestTopics()
+                        .setName(topic.name())
+                        .setPartitionIndexes(topic.partitionIndexes())
+                    );
+                });
+            }
 
             return Collections.singletonList(group);
         }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 5c8bad2c233..014e3ef0220 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -4208,6 +4208,77 @@ class KafkaApisTest {
     assertEquals(expectedOffsetFetchResponse, response.data)
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+  def testHandleOffsetFetchAllOffsetsWithSingleGroup(version: Short): Unit = {
+    // Version 0 gets offsets from Zookeeper. Version 1 does not support fetching all
+    // offsets request. We are not interested in testing these here.
+    if (version < 2) return
+
+    def makeRequest(version: Short): RequestChannel.Request = {
+      buildRequest(new OffsetFetchRequest.Builder(
+        "group-1",
+        false,
+        null, // all offsets.
+        false
+      ).build(version))
+    }
+
+    val requestChannelRequest = makeRequest(version)
+
+    val future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+    when(groupCoordinator.fetchAllOffsets(
+      requestChannelRequest.context,
+      "group-1",
+      false
+    )).thenReturn(future)
+
+    createKafkaApis().handleOffsetFetchRequest(requestChannelRequest)
+
+    val group1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
+      .setGroupId("group-1")
+      .setTopics(List(
+        new OffsetFetchResponseData.OffsetFetchResponseTopics()
+          .setName("foo")
+          .setPartitions(List(
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(0)
+              .setCommittedOffset(100)
+              .setCommittedLeaderEpoch(1),
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(1)
+              .setCommittedOffset(200)
+              .setCommittedLeaderEpoch(2)
+          ).asJava)
+      ).asJava)
+
+    val expectedOffsetFetchResponse = if (version >= 8) {
+      new OffsetFetchResponseData()
+        .setGroups(List(group1Response).asJava)
+    } else {
+      new OffsetFetchResponseData()
+        .setTopics(List(
+          new OffsetFetchResponseData.OffsetFetchResponseTopic()
+            .setName("foo")
+            .setPartitions(List(
+              new OffsetFetchResponseData.OffsetFetchResponsePartition()
+                .setPartitionIndex(0)
+                .setCommittedOffset(100)
+                .setCommittedLeaderEpoch(if (version >= 5) 1 else -1),
+              new OffsetFetchResponseData.OffsetFetchResponsePartition()
+                .setPartitionIndex(1)
+                .setCommittedOffset(200)
+                .setCommittedLeaderEpoch(if (version >= 5) 2 else -1)
+            ).asJava)
+        ).asJava)
+    }
+
+    future.complete(group1Response.topics)
+
+    val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
+    assertEquals(expectedOffsetFetchResponse, response.data)
+  }
+
   @Test
   def testHandleOffsetFetchAuthorization(): Unit = {
     def makeRequest(version: Short): RequestChannel.Request = {
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index c0a4355c63b..cf1c52841e5 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -29,7 +29,7 @@ import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
 import java.util
 import java.util.Collections.singletonList
 import scala.jdk.CollectionConverters._
-import java.util.{Optional, Properties}
+import java.util.{Collections, Optional, Properties}
 
 class OffsetFetchRequestTest extends BaseRequestTest {
 
@@ -126,6 +126,56 @@ class OffsetFetchRequestTest extends BaseRequestTest {
     }
   }
 
+  @Test
+  def testOffsetFetchRequestAllOffsetsSingleGroup(): Unit = {
+    createTopic(topic)
+
+    val tpList = singletonList(new TopicPartition(topic, 0))
+    consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+    commitOffsets(tpList)
+
+    // Testing from version 2 onward since version 0 and do not support
+    // fetching all offsets.
+    for (version <- 2 to ApiKeys.OFFSET_FETCH.latestVersion()) {
+      if (version < 8) {
+        val request = new OffsetFetchRequest.Builder(
+          groupId,
+          false,
+          null,
+          version >= 7
+        ).build(version.toShort)
+
+        val response = connectAndReceive[OffsetFetchResponse](request)
+        assertEquals(Errors.NONE, response.error())
+        val topicData = response.data.topics().get(0)
+        val partitionData = topicData.partitions().get(0)
+        if (version < 3) {
+          assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs())
+        }
+        verifySingleGroupResponse(version.asInstanceOf[Short],
+          response.error().code(), partitionData.errorCode(), topicData.name(),
+          partitionData.partitionIndex(), partitionData.committedOffset(),
+          partitionData.committedLeaderEpoch(), partitionData.metadata())
+      } else {
+        val request = new OffsetFetchRequest.Builder(
+          Collections.singletonMap(groupId, null),
+          false,
+          false
+        ).build(version.toShort)
+
+        val response = connectAndReceive[OffsetFetchResponse](request)
+        assertEquals(Errors.NONE, response.groupLevelError(groupId))
+        val groupData = response.data().groups().get(0)
+        val topicData = groupData.topics().get(0)
+        val partitionData = topicData.partitions().get(0)
+        verifySingleGroupResponse(version.asInstanceOf[Short],
+          groupData.errorCode(), partitionData.errorCode(), topicData.name(),
+          partitionData.partitionIndex(), partitionData.committedOffset(),
+          partitionData.committedLeaderEpoch(), partitionData.metadata())
+      }
+    }
+  }
+
   @Test
   def testOffsetFetchRequestWithMultipleGroups(): Unit = {
     createTopic(topics(0))