You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/02/23 17:32:54 UTC
[kafka] branch trunk updated: KAFKA-14744; NPE while converting OffsetFetch from version < 8 to version >= 8 (#13295)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c55e5afb836 KAFKA-14744; NPE while converting OffsetFetch from version < 8 to version >= 8 (#13295)
c55e5afb836 is described below
commit c55e5afb836e7f1fed749b9d02a73bc83fdf52fd
Author: David Jacot <dj...@confluent.io>
AuthorDate: Thu Feb 23 18:32:33 2023 +0100
KAFKA-14744; NPE while converting OffsetFetch from version < 8 to version >= 8 (#13295)
While refactoring the OffsetFetch handling in KafkaApis, we introduced a NullPointerException (NPE). The NPE arises when the FetchOffset API is called with a client using a version older than version 8 and using null for the topics to signal that all topic-partition offsets must be returned. This means that this bug mainly impacts admin tools. The consumer does not use null.
This NPE is here: https://github.com/apache/kafka/commit/24a86423e9907b751d98fddc7196332feea2b48d#diff-0f2f19fd03e2fc5aa9618c607b432ea72e5aaa53866f07444269f38cb537f3feR237.
We missed this during the refactor because we had no tests in place to test this mode.
Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Justine Olshan <jo...@confluent.io>
---
.../kafka/common/requests/OffsetFetchRequest.java | 24 +++++---
.../scala/unit/kafka/server/KafkaApisTest.scala | 71 ++++++++++++++++++++++
.../unit/kafka/server/OffsetFetchRequestTest.scala | 52 +++++++++++++++-
3 files changed, 138 insertions(+), 9 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index edcad541dca..1321612d30a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -232,14 +232,22 @@ public class OffsetFetchRequest extends AbstractRequest {
return data.groups();
} else {
OffsetFetchRequestData.OffsetFetchRequestGroup group =
- new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(data.groupId());
-
- data.topics().forEach(topic -> {
- group.topics().add(new OffsetFetchRequestTopics()
- .setName(topic.name())
- .setPartitionIndexes(topic.partitionIndexes())
- );
- });
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId(data.groupId());
+
+ if (data.topics() == null) {
+ // If topics is null, it means that all topic-partitions should
+ // be fetched hence we preserve it.
+ group.setTopics(null);
+ } else {
+ // Otherwise, topics are translated to the new structure.
+ data.topics().forEach(topic -> {
+ group.topics().add(new OffsetFetchRequestTopics()
+ .setName(topic.name())
+ .setPartitionIndexes(topic.partitionIndexes())
+ );
+ });
+ }
return Collections.singletonList(group);
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 5c8bad2c233..014e3ef0220 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -4208,6 +4208,77 @@ class KafkaApisTest {
assertEquals(expectedOffsetFetchResponse, response.data)
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+ def testHandleOffsetFetchAllOffsetsWithSingleGroup(version: Short): Unit = {
+ // Version 0 gets offsets from Zookeeper. Version 1 does not support fetching all
+ // offsets request. We are not interested in testing these here.
+ if (version < 2) return
+
+ def makeRequest(version: Short): RequestChannel.Request = {
+ buildRequest(new OffsetFetchRequest.Builder(
+ "group-1",
+ false,
+ null, // all offsets.
+ false
+ ).build(version))
+ }
+
+ val requestChannelRequest = makeRequest(version)
+
+ val future = new CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+ when(groupCoordinator.fetchAllOffsets(
+ requestChannelRequest.context,
+ "group-1",
+ false
+ )).thenReturn(future)
+
+ createKafkaApis().handleOffsetFetchRequest(requestChannelRequest)
+
+ val group1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
+ .setGroupId("group-1")
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100)
+ .setCommittedLeaderEpoch(1),
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+ .setPartitionIndex(1)
+ .setCommittedOffset(200)
+ .setCommittedLeaderEpoch(2)
+ ).asJava)
+ ).asJava)
+
+ val expectedOffsetFetchResponse = if (version >= 8) {
+ new OffsetFetchResponseData()
+ .setGroups(List(group1Response).asJava)
+ } else {
+ new OffsetFetchResponseData()
+ .setTopics(List(
+ new OffsetFetchResponseData.OffsetFetchResponseTopic()
+ .setName("foo")
+ .setPartitions(List(
+ new OffsetFetchResponseData.OffsetFetchResponsePartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100)
+ .setCommittedLeaderEpoch(if (version >= 5) 1 else -1),
+ new OffsetFetchResponseData.OffsetFetchResponsePartition()
+ .setPartitionIndex(1)
+ .setCommittedOffset(200)
+ .setCommittedLeaderEpoch(if (version >= 5) 2 else -1)
+ ).asJava)
+ ).asJava)
+ }
+
+ future.complete(group1Response.topics)
+
+ val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
+ assertEquals(expectedOffsetFetchResponse, response.data)
+ }
+
@Test
def testHandleOffsetFetchAuthorization(): Unit = {
def makeRequest(version: Short): RequestChannel.Request = {
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index c0a4355c63b..cf1c52841e5 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -29,7 +29,7 @@ import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import java.util
import java.util.Collections.singletonList
import scala.jdk.CollectionConverters._
-import java.util.{Optional, Properties}
+import java.util.{Collections, Optional, Properties}
class OffsetFetchRequestTest extends BaseRequestTest {
@@ -126,6 +126,56 @@ class OffsetFetchRequestTest extends BaseRequestTest {
}
}
+ @Test
+ def testOffsetFetchRequestAllOffsetsSingleGroup(): Unit = {
+ createTopic(topic)
+
+ val tpList = singletonList(new TopicPartition(topic, 0))
+ consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+ commitOffsets(tpList)
+
+ // Testing from version 2 onward since version 0 and do not support
+ // fetching all offsets.
+ for (version <- 2 to ApiKeys.OFFSET_FETCH.latestVersion()) {
+ if (version < 8) {
+ val request = new OffsetFetchRequest.Builder(
+ groupId,
+ false,
+ null,
+ version >= 7
+ ).build(version.toShort)
+
+ val response = connectAndReceive[OffsetFetchResponse](request)
+ assertEquals(Errors.NONE, response.error())
+ val topicData = response.data.topics().get(0)
+ val partitionData = topicData.partitions().get(0)
+ if (version < 3) {
+ assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs())
+ }
+ verifySingleGroupResponse(version.asInstanceOf[Short],
+ response.error().code(), partitionData.errorCode(), topicData.name(),
+ partitionData.partitionIndex(), partitionData.committedOffset(),
+ partitionData.committedLeaderEpoch(), partitionData.metadata())
+ } else {
+ val request = new OffsetFetchRequest.Builder(
+ Collections.singletonMap(groupId, null),
+ false,
+ false
+ ).build(version.toShort)
+
+ val response = connectAndReceive[OffsetFetchResponse](request)
+ assertEquals(Errors.NONE, response.groupLevelError(groupId))
+ val groupData = response.data().groups().get(0)
+ val topicData = groupData.topics().get(0)
+ val partitionData = topicData.partitions().get(0)
+ verifySingleGroupResponse(version.asInstanceOf[Short],
+ groupData.errorCode(), partitionData.errorCode(), topicData.name(),
+ partitionData.partitionIndex(), partitionData.committedOffset(),
+ partitionData.committedLeaderEpoch(), partitionData.metadata())
+ }
+ }
+ }
+
@Test
def testOffsetFetchRequestWithMultipleGroups(): Unit = {
createTopic(topics(0))