You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/11/18 03:22:04 UTC
[kafka] branch 2.4 updated: KAFKA-9200: ListOffsetRequest missing
error response for v5 (#7704)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 28af3e3 KAFKA-9200: ListOffsetRequest missing error response for v5 (#7704)
28af3e3 is described below
commit 28af3e30c1d88be567537a2d400adca29c8e21dc
Author: Lucas Bradstreet <lu...@confluent.io>
AuthorDate: Sun Nov 17 19:18:20 2019 -0800
KAFKA-9200: ListOffsetRequest missing error response for v5 (#7704)
ListOffsetResponse getErrorResponse is missing a a case for version 5, introduced
by 152292994e4 and released in 2.3.0.
```
java.lang.IllegalArgumentException: Version 5 is not valid. Valid versions for ListOffsetRequest are 0 to 5
at org.apache.kafka.common.requests.ListOffsetRequest.getErrorResponse(ListOffsetRequest.java:282)
at kafka.server.KafkaApis.sendErrorOrCloseConnection(KafkaApis.scala:3062)
at kafka.server.KafkaApis.sendErrorResponseMaybeThrottle(KafkaApis.scala:3045)
at kafka.server.KafkaApis.handleError(KafkaApis.scala:3027)
at kafka.server.KafkaApis.handle(KafkaApis.scala:209)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:78)
at java.lang.Thread.run(Thread.java:748)
```
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../apache/kafka/common/requests/ListOffsetRequest.java | 3 ++-
.../apache/kafka/common/requests/RequestResponseTest.java | 15 +++++++--------
2 files changed, 9 insertions(+), 9 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index e9fe942..47c47d2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -269,12 +269,13 @@ public class ListOffsetRequest extends AbstractRequest {
responseData.put(partition, partitionError);
}
- switch (version()) {
+ switch (versionId) {
case 0:
case 1:
case 2:
case 3:
case 4:
+ case 5:
return new ListOffsetResponse(throttleTimeMs, responseData);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c8bd751..5934dc5 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -209,12 +209,11 @@ public class RequestResponseTest {
checkRequest(createDeleteGroupsRequest(), true);
checkErrorResponse(createDeleteGroupsRequest(), new UnknownServerException(), true);
checkResponse(createDeleteGroupsResponse(), 0, true);
- checkRequest(createListOffsetRequest(1), true);
- checkErrorResponse(createListOffsetRequest(1), new UnknownServerException(), true);
- checkResponse(createListOffsetResponse(1), 1, true);
- checkRequest(createListOffsetRequest(2), true);
- checkErrorResponse(createListOffsetRequest(2), new UnknownServerException(), true);
- checkResponse(createListOffsetResponse(2), 2, true);
+ for (int i = 0; i < ApiKeys.LIST_OFFSETS.latestVersion(); i++) {
+ checkRequest(createListOffsetRequest(i), true);
+ checkErrorResponse(createListOffsetRequest(i), new UnknownServerException(), true);
+ checkResponse(createListOffsetResponse(i), i, true);
+ }
checkRequest(MetadataRequest.Builder.allTopics().build((short) 2), true);
checkRequest(createMetadataRequest(1, Collections.singletonList("topic1")), true);
checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), new UnknownServerException(), true);
@@ -1096,7 +1095,7 @@ public class RequestResponseTest {
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
.setTargetTimes(offsetData)
.build((short) version);
- } else if (version == 2) {
+ } else if (version >= 2 && version <= 5) {
Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = Collections.singletonMap(
new TopicPartition("test", 0),
new ListOffsetRequest.PartitionData(1000000L, Optional.of(5)));
@@ -1116,7 +1115,7 @@ public class RequestResponseTest {
responseData.put(new TopicPartition("test", 0),
new ListOffsetResponse.PartitionData(Errors.NONE, asList(100L)));
return new ListOffsetResponse(responseData);
- } else if (version == 1 || version == 2) {
+ } else if (version >= 1 && version <= 5) {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0),
new ListOffsetResponse.PartitionData(Errors.NONE, 10000L, 100L, Optional.of(27)));