You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2021/09/08 09:14:04 UTC

[kafka] branch 3.0 updated: KAFKA-13258/13259/13260: Fix error response generation (#11300)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 77ae47d  KAFKA-13258/13259/13260: Fix error response generation (#11300)
77ae47d is described below

commit 77ae47d85b310eaf2902bc9843d870faf44203f3
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Wed Sep 8 11:12:55 2021 +0200

    KAFKA-13258/13259/13260: Fix error response generation (#11300)
    
    AlterClientQuotas, DescribeProducers and FindCoordinator have issues when building error responses. This can lead to brokers returning responses without errors even when some have happened.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../kafka/common/requests/AlterClientQuotasRequest.java       |  7 ++++++-
 .../kafka/common/requests/DescribeProducersRequest.java       |  1 +
 .../apache/kafka/common/requests/FindCoordinatorResponse.java | 11 ++++++++++-
 .../org/apache/kafka/common/requests/RequestResponseTest.java |  5 +++++
 4 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
index d03c267..3b06348 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.message.AlterClientQuotasRequestData.OpData;
 import org.apache.kafka.common.message.AlterClientQuotasResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.quota.ClientQuotaAlteration;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
 
@@ -117,6 +118,7 @@ public class AlterClientQuotasRequest extends AbstractRequest {
 
     @Override
     public AlterClientQuotasResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        Errors error = Errors.forException(e);
         List<AlterClientQuotasResponseData.EntryData> responseEntries = new ArrayList<>();
         for (EntryData entryData : data.entries()) {
             List<AlterClientQuotasResponseData.EntityData> responseEntities = new ArrayList<>();
@@ -125,7 +127,10 @@ public class AlterClientQuotasRequest extends AbstractRequest {
                     .setEntityType(entityData.entityType())
                     .setEntityName(entityData.entityName()));
             }
-            responseEntries.add(new AlterClientQuotasResponseData.EntryData().setEntity(responseEntities));
+            responseEntries.add(new AlterClientQuotasResponseData.EntryData()
+                .setEntity(responseEntities)
+                .setErrorCode(error.code())
+                .setErrorMessage(error.message()));
         }
         AlterClientQuotasResponseData responseData = new AlterClientQuotasResponseData()
                 .setThrottleTimeMs(throttleTimeMs)
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersRequest.java
index 77e7ecc..39aab22 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersRequest.java
@@ -80,6 +80,7 @@ public class DescribeProducersRequest extends AbstractRequest {
                         .setErrorCode(error.code())
                 );
             }
+            response.topics().add(topicResponse);
         }
         return new DescribeProducersResponse(response);
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index 156277a..080ba24 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.Errors;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -73,7 +74,15 @@ public class FindCoordinatorResponse extends AbstractResponse {
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return errorCounts(error());
+        if (!data.coordinators().isEmpty()) {
+            Map<Errors, Integer> errorCounts = new HashMap<>();
+            for (Coordinator coordinator : data.coordinators()) {
+                updateErrorCounts(errorCounts, Errors.forCode(coordinator.errorCode()));
+            }
+            return errorCounts;
+        } else {
+            return errorCounts(error());
+        }
     }
 
     public static FindCoordinatorResponse parse(ByteBuffer buffer, short version) {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 887d15c..118a424 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -706,6 +706,11 @@ public class RequestResponseTest {
     private void checkErrorResponse(AbstractRequest req, Throwable e, boolean checkEqualityAndHashCode) {
         AbstractResponse response = req.getErrorResponse(e);
         checkResponse(response, req.version(), checkEqualityAndHashCode);
+        Errors error = Errors.forException(e);
+        Map<Errors, Integer> errorCounts = response.errorCounts();
+        assertEquals(Collections.singleton(error), errorCounts.keySet(),
+            "API Key " + req.apiKey().name + " v" + req.version() + " failed errorCounts test");
+        assertTrue(errorCounts.get(error) > 0);
         if (e instanceof UnknownServerException) {
             String responseStr = response.toString();
             assertFalse(responseStr.contains(e.getMessage()),