You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/11/18 03:22:04 UTC

[kafka] branch 2.4 updated: KAFKA-9200: ListOffsetRequest missing error response for v5 (#7704)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 28af3e3  KAFKA-9200: ListOffsetRequest missing error response for v5 (#7704)
28af3e3 is described below

commit 28af3e30c1d88be567537a2d400adca29c8e21dc
Author: Lucas Bradstreet <lu...@confluent.io>
AuthorDate: Sun Nov 17 19:18:20 2019 -0800

    KAFKA-9200: ListOffsetRequest missing error response for v5 (#7704)
    
    ListOffsetResponse getErrorResponse is missing a a case for version 5, introduced
    by 152292994e4 and released in 2.3.0.
    
    ```
    java.lang.IllegalArgumentException: Version 5 is not valid. Valid versions for ListOffsetRequest are 0 to 5
            at org.apache.kafka.common.requests.ListOffsetRequest.getErrorResponse(ListOffsetRequest.java:282)
            at kafka.server.KafkaApis.sendErrorOrCloseConnection(KafkaApis.scala:3062)
            at kafka.server.KafkaApis.sendErrorResponseMaybeThrottle(KafkaApis.scala:3045)
            at kafka.server.KafkaApis.handleError(KafkaApis.scala:3027)
            at kafka.server.KafkaApis.handle(KafkaApis.scala:209)
            at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:78)
            at java.lang.Thread.run(Thread.java:748)
    ```
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../apache/kafka/common/requests/ListOffsetRequest.java   |  3 ++-
 .../apache/kafka/common/requests/RequestResponseTest.java | 15 +++++++--------
 2 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index e9fe942..47c47d2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -269,12 +269,13 @@ public class ListOffsetRequest extends AbstractRequest {
             responseData.put(partition, partitionError);
         }
 
-        switch (version()) {
+        switch (versionId) {
             case 0:
             case 1:
             case 2:
             case 3:
             case 4:
+            case 5:
                 return new ListOffsetResponse(throttleTimeMs, responseData);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c8bd751..5934dc5 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -209,12 +209,11 @@ public class RequestResponseTest {
         checkRequest(createDeleteGroupsRequest(), true);
         checkErrorResponse(createDeleteGroupsRequest(), new UnknownServerException(), true);
         checkResponse(createDeleteGroupsResponse(), 0, true);
-        checkRequest(createListOffsetRequest(1), true);
-        checkErrorResponse(createListOffsetRequest(1), new UnknownServerException(), true);
-        checkResponse(createListOffsetResponse(1), 1, true);
-        checkRequest(createListOffsetRequest(2), true);
-        checkErrorResponse(createListOffsetRequest(2), new UnknownServerException(), true);
-        checkResponse(createListOffsetResponse(2), 2, true);
+        for (int i = 0; i < ApiKeys.LIST_OFFSETS.latestVersion(); i++) {
+            checkRequest(createListOffsetRequest(i), true);
+            checkErrorResponse(createListOffsetRequest(i), new UnknownServerException(), true);
+            checkResponse(createListOffsetResponse(i), i, true);
+        }
         checkRequest(MetadataRequest.Builder.allTopics().build((short) 2), true);
         checkRequest(createMetadataRequest(1, Collections.singletonList("topic1")), true);
         checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), new UnknownServerException(), true);
@@ -1096,7 +1095,7 @@ public class RequestResponseTest {
                     .forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
                     .setTargetTimes(offsetData)
                     .build((short) version);
-        } else if (version == 2) {
+        } else if (version >= 2 && version <= 5) {
             Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = Collections.singletonMap(
                     new TopicPartition("test", 0),
                     new ListOffsetRequest.PartitionData(1000000L, Optional.of(5)));
@@ -1116,7 +1115,7 @@ public class RequestResponseTest {
             responseData.put(new TopicPartition("test", 0),
                     new ListOffsetResponse.PartitionData(Errors.NONE, asList(100L)));
             return new ListOffsetResponse(responseData);
-        } else if (version == 1 || version == 2) {
+        } else if (version >= 1 && version <= 5) {
             Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
             responseData.put(new TopicPartition("test", 0),
                     new ListOffsetResponse.PartitionData(Errors.NONE, 10000L, 100L, Optional.of(27)));