You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2021/09/08 09:14:04 UTC
[kafka] branch 3.0 updated: KAFKA-13258/13259/13260: Fix error
response generation (#11300)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 77ae47d KAFKA-13258/13259/13260: Fix error response generation (#11300)
77ae47d is described below
commit 77ae47d85b310eaf2902bc9843d870faf44203f3
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Wed Sep 8 11:12:55 2021 +0200
KAFKA-13258/13259/13260: Fix error response generation (#11300)
AlterClientQuotas, DescribeProducers and FindCoordinator have issues when building error responses. This can lead to brokers returning responses without errors even when some have happened.
Reviewers: David Jacot <dj...@confluent.io>
---
.../kafka/common/requests/AlterClientQuotasRequest.java | 7 ++++++-
.../kafka/common/requests/DescribeProducersRequest.java | 1 +
.../apache/kafka/common/requests/FindCoordinatorResponse.java | 11 ++++++++++-
.../org/apache/kafka/common/requests/RequestResponseTest.java | 5 +++++
4 files changed, 22 insertions(+), 2 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
index d03c267..3b06348 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.message.AlterClientQuotasRequestData.OpData;
import org.apache.kafka.common.message.AlterClientQuotasResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
@@ -117,6 +118,7 @@ public class AlterClientQuotasRequest extends AbstractRequest {
@Override
public AlterClientQuotasResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ Errors error = Errors.forException(e);
List<AlterClientQuotasResponseData.EntryData> responseEntries = new ArrayList<>();
for (EntryData entryData : data.entries()) {
List<AlterClientQuotasResponseData.EntityData> responseEntities = new ArrayList<>();
@@ -125,7 +127,10 @@ public class AlterClientQuotasRequest extends AbstractRequest {
.setEntityType(entityData.entityType())
.setEntityName(entityData.entityName()));
}
- responseEntries.add(new AlterClientQuotasResponseData.EntryData().setEntity(responseEntities));
+ responseEntries.add(new AlterClientQuotasResponseData.EntryData()
+ .setEntity(responseEntities)
+ .setErrorCode(error.code())
+ .setErrorMessage(error.message()));
}
AlterClientQuotasResponseData responseData = new AlterClientQuotasResponseData()
.setThrottleTimeMs(throttleTimeMs)
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersRequest.java
index 77e7ecc..39aab22 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersRequest.java
@@ -80,6 +80,7 @@ public class DescribeProducersRequest extends AbstractRequest {
.setErrorCode(error.code())
);
}
+ response.topics().add(topicResponse);
}
return new DescribeProducersResponse(response);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index 156277a..080ba24 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -73,7 +74,15 @@ public class FindCoordinatorResponse extends AbstractResponse {
@Override
public Map<Errors, Integer> errorCounts() {
- return errorCounts(error());
+ if (!data.coordinators().isEmpty()) {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (Coordinator coordinator : data.coordinators()) {
+ updateErrorCounts(errorCounts, Errors.forCode(coordinator.errorCode()));
+ }
+ return errorCounts;
+ } else {
+ return errorCounts(error());
+ }
}
public static FindCoordinatorResponse parse(ByteBuffer buffer, short version) {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 887d15c..118a424 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -706,6 +706,11 @@ public class RequestResponseTest {
private void checkErrorResponse(AbstractRequest req, Throwable e, boolean checkEqualityAndHashCode) {
AbstractResponse response = req.getErrorResponse(e);
checkResponse(response, req.version(), checkEqualityAndHashCode);
+ Errors error = Errors.forException(e);
+ Map<Errors, Integer> errorCounts = response.errorCounts();
+ assertEquals(Collections.singleton(error), errorCounts.keySet(),
+ "API Key " + req.apiKey().name + " v" + req.version() + " failed errorCounts test");
+ assertTrue(errorCounts.get(error) > 0);
if (e instanceof UnknownServerException) {
String responseStr = response.toString();
assertFalse(responseStr.contains(e.getMessage()),