You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2021/09/08 09:11:51 UTC

[kafka] branch 2.8 updated: KAFKA-13258/KAFKA-13259: Fix error response generation (#11299)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new fa47f2a  KAFKA-13258/KAFKA-13259: Fix error response generation (#11299)
fa47f2a is described below

commit fa47f2a2cecb8f9791481123d36db95b1a1d3361
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Wed Sep 8 11:10:33 2021 +0200

    KAFKA-13258/KAFKA-13259: Fix error response generation (#11299)
    
    AlterClientQuotas and DescribeProducers have issues when building error responses. This can lead to brokers returning responses without errors even when some have happened.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../src/main/java/org/apache/kafka/common/protocol/ApiKeys.java    | 2 +-
 .../org/apache/kafka/common/requests/AlterClientQuotasRequest.java | 7 ++++++-
 .../org/apache/kafka/common/requests/DescribeProducersRequest.java | 1 +
 .../java/org/apache/kafka/common/requests/RequestResponseTest.java | 5 +++++
 4 files changed, 13 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 475fc84..9796b59 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -199,7 +199,7 @@ public enum ApiKeys {
 
     public List<Short> allVersions() {
         List<Short> versions = new ArrayList<>(latestVersion() - oldestVersion() + 1);
-        for (short version = oldestVersion(); version < latestVersion(); version++) {
+        for (short version = oldestVersion(); version <= latestVersion(); version++) {
             versions.add(version);
         }
         return versions;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
index d03c267..3b06348 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.message.AlterClientQuotasRequestData.OpData;
 import org.apache.kafka.common.message.AlterClientQuotasResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.quota.ClientQuotaAlteration;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
 
@@ -117,6 +118,7 @@ public class AlterClientQuotasRequest extends AbstractRequest {
 
     @Override
     public AlterClientQuotasResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        Errors error = Errors.forException(e);
         List<AlterClientQuotasResponseData.EntryData> responseEntries = new ArrayList<>();
         for (EntryData entryData : data.entries()) {
             List<AlterClientQuotasResponseData.EntityData> responseEntities = new ArrayList<>();
@@ -125,7 +127,10 @@ public class AlterClientQuotasRequest extends AbstractRequest {
                     .setEntityType(entityData.entityType())
                     .setEntityName(entityData.entityName()));
             }
-            responseEntries.add(new AlterClientQuotasResponseData.EntryData().setEntity(responseEntities));
+            responseEntries.add(new AlterClientQuotasResponseData.EntryData()
+                .setEntity(responseEntities)
+                .setErrorCode(error.code())
+                .setErrorMessage(error.message()));
         }
         AlterClientQuotasResponseData responseData = new AlterClientQuotasResponseData()
                 .setThrottleTimeMs(throttleTimeMs)
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersRequest.java
index 77e7ecc..39aab22 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersRequest.java
@@ -80,6 +80,7 @@ public class DescribeProducersRequest extends AbstractRequest {
                         .setErrorCode(error.code())
                 );
             }
+            response.topics().add(topicResponse);
         }
         return new DescribeProducersResponse(response);
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index de88025..9346665 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -657,6 +657,11 @@ public class RequestResponseTest {
     private void checkErrorResponse(AbstractRequest req, Throwable e, boolean checkEqualityAndHashCode) {
         AbstractResponse response = req.getErrorResponse(e);
         checkResponse(response, req.version(), checkEqualityAndHashCode);
+        Errors error = Errors.forException(e);
+        Map<Errors, Integer> errorCounts = response.errorCounts();
+        assertEquals(Collections.singleton(error), errorCounts.keySet(),
+            "API Key " + req.apiKey().name + " v" + req.version() + " failed errorCounts test");
+        assertTrue(errorCounts.get(error) > 0);
         if (e instanceof UnknownServerException) {
             String responseStr = response.toString();
             assertFalse(responseStr.contains(e.getMessage()),