You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/07/17 08:26:17 UTC
[kafka] branch 2.6 updated: MINOR: Filter out quota configs for
ConfigCommand using --bootstrap-server (#9030)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new dd71437 MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server (#9030)
dd71437 is described below
commit dd71437de7675d92ad3e4ed01ac3ee11bf5da99d
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Jul 17 08:55:53 2020 +0100
MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server (#9030)
Reviewers: Manikumar Reddy <ma...@gmail.com>, David Jacot <dj...@confluent.io>, Ron Dagostino <rd...@confluent.io>
---
.../common/requests/AlterClientQuotasResponse.java | 13 +-
.../org/apache/kafka/common/requests/ApiError.java | 5 +-
.../requests/DescribeClientQuotasResponse.java | 11 +-
.../common/requests/SaslAuthenticateRequest.java | 5 +-
.../kafka/common/requests/RequestResponseTest.java | 198 +++++++++++++--------
.../src/main/scala/kafka/admin/ConfigCommand.scala | 9 +
.../src/main/scala/kafka/server/AdminManager.scala | 12 +-
.../main/scala/kafka/server/DynamicConfig.scala | 13 +-
.../scala/unit/kafka/admin/ConfigCommandTest.scala | 21 +++
.../kafka/server/ClientQuotasRequestTest.scala | 30 +++-
10 files changed, 220 insertions(+), 97 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java
index a3b01bf..3e95671 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java
@@ -20,10 +20,12 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.AlterClientQuotasResponseData;
import org.apache.kafka.common.message.AlterClientQuotasResponseData.EntityData;
import org.apache.kafka.common.message.AlterClientQuotasResponseData.EntryData;
+import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.quota.ClientQuotaEntity;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -50,14 +52,13 @@ public class AlterClientQuotasResponse extends AbstractResponse {
}
public AlterClientQuotasResponse(Collection<ClientQuotaEntity> entities, int throttleTimeMs, Throwable e) {
- short errorCode = Errors.forException(e).code();
- String errorMessage = e.getMessage();
+ ApiError apiError = ApiError.fromThrowable(e);
List<EntryData> entries = new ArrayList<>(entities.size());
for (ClientQuotaEntity entity : entities) {
entries.add(new EntryData()
- .setErrorCode(errorCode)
- .setErrorMessage(errorMessage)
+ .setErrorCode(apiError.error().code())
+ .setErrorMessage(apiError.message())
.setEntity(toEntityData(entity)));
}
@@ -120,4 +121,8 @@ public class AlterClientQuotasResponse extends AbstractResponse {
}
return entityData;
}
+
+ public static AlterClientQuotasResponse parse(ByteBuffer buffer, short version) {
+ return new AlterClientQuotasResponse(ApiKeys.ALTER_CLIENT_QUOTAS.parseResponse(version, buffer), version);
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
index 6cb09f0..5c9ca7b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
@@ -38,9 +38,10 @@ public class ApiError {
private final String message;
public static ApiError fromThrowable(Throwable t) {
- // Avoid populating the error message if it's a generic one
+ // Avoid populating the error message if it's a generic one. Also don't populate error
+ // message for UNKNOWN_SERVER_ERROR to ensure we don't leak sensitive information.
Errors error = Errors.forException(t);
- String message = error.message().equals(t.getMessage()) ? null : t.getMessage();
+ String message = error == Errors.UNKNOWN_SERVER_ERROR || error.message().equals(t.getMessage()) ? null : t.getMessage();
return new ApiError(error, message);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java
index cb54b01..bda3673 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java
@@ -21,10 +21,12 @@ import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntityData;
import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryData;
import org.apache.kafka.common.message.DescribeClientQuotasResponseData.ValueData;
+import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.quota.ClientQuotaEntity;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -66,10 +68,11 @@ public class DescribeClientQuotasResponse extends AbstractResponse {
}
public DescribeClientQuotasResponse(int throttleTimeMs, Throwable e) {
+ ApiError apiError = ApiError.fromThrowable(e);
this.data = new DescribeClientQuotasResponseData()
.setThrottleTimeMs(throttleTimeMs)
- .setErrorCode(Errors.forException(e).code())
- .setErrorMessage(e.getMessage())
+ .setErrorCode(apiError.error().code())
+ .setErrorMessage(apiError.message())
.setEntries(null);
}
@@ -115,4 +118,8 @@ public class DescribeClientQuotasResponse extends AbstractResponse {
protected Struct toStruct(short version) {
return data.toStruct(version);
}
+
+ public static DescribeClientQuotasResponse parse(ByteBuffer buffer, short version) {
+ return new DescribeClientQuotasResponse(ApiKeys.DESCRIBE_CLIENT_QUOTAS.parseResponse(version, buffer), version);
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
index 0ed5125..5fdfbec 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
@@ -77,9 +77,10 @@ public class SaslAuthenticateRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ ApiError apiError = ApiError.fromThrowable(e);
SaslAuthenticateResponseData response = new SaslAuthenticateResponseData()
- .setErrorCode(ApiError.fromThrowable(e).error().code())
- .setErrorMessage(e.getMessage());
+ .setErrorCode(apiError.error().code())
+ .setErrorMessage(apiError.message());
return new SaslAuthenticateResponse(response);
}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c508dbc..50775b6 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -139,6 +139,9 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
@@ -190,18 +193,21 @@ import static org.junit.Assert.fail;
public class RequestResponseTest {
+ // Exception includes a message that we verify is not included in error responses
+ private final UnknownServerException unknownServerException = new UnknownServerException("secret");
+
@Test
public void testSerialization() throws Exception {
checkRequest(createFindCoordinatorRequest(0), true);
checkRequest(createFindCoordinatorRequest(1), true);
- checkErrorResponse(createFindCoordinatorRequest(0), new UnknownServerException(), true);
- checkErrorResponse(createFindCoordinatorRequest(1), new UnknownServerException(), true);
+ checkErrorResponse(createFindCoordinatorRequest(0), unknownServerException, true);
+ checkErrorResponse(createFindCoordinatorRequest(1), unknownServerException, true);
checkResponse(createFindCoordinatorResponse(), 0, true);
checkResponse(createFindCoordinatorResponse(), 1, true);
checkRequest(createControlledShutdownRequest(), true);
checkResponse(createControlledShutdownResponse(), 1, true);
- checkErrorResponse(createControlledShutdownRequest(), new UnknownServerException(), true);
- checkErrorResponse(createControlledShutdownRequest(0), new UnknownServerException(), true);
+ checkErrorResponse(createControlledShutdownRequest(), unknownServerException, true);
+ checkErrorResponse(createControlledShutdownRequest(0), unknownServerException, true);
checkRequest(createFetchRequest(4), true);
checkResponse(createFetchResponse(), 4, true);
List<TopicPartition> toForgetTopics = new ArrayList<>();
@@ -211,53 +217,53 @@ public class RequestResponseTest {
checkRequest(createFetchRequest(7, new FetchMetadata(123, 456), toForgetTopics), true);
checkResponse(createFetchResponse(123), 7, true);
checkResponse(createFetchResponse(Errors.FETCH_SESSION_ID_NOT_FOUND, 123), 7, true);
- checkErrorResponse(createFetchRequest(4), new UnknownServerException(), true);
+ checkErrorResponse(createFetchRequest(4), unknownServerException, true);
checkRequest(createHeartBeatRequest(), true);
- checkErrorResponse(createHeartBeatRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createHeartBeatRequest(), unknownServerException, true);
checkResponse(createHeartBeatResponse(), 0, true);
for (int v = ApiKeys.JOIN_GROUP.oldestVersion(); v <= ApiKeys.JOIN_GROUP.latestVersion(); v++) {
checkRequest(createJoinGroupRequest(v), true);
- checkErrorResponse(createJoinGroupRequest(v), new UnknownServerException(), true);
+ checkErrorResponse(createJoinGroupRequest(v), unknownServerException, true);
checkResponse(createJoinGroupResponse(v), v, true);
}
for (int v = ApiKeys.SYNC_GROUP.oldestVersion(); v <= ApiKeys.SYNC_GROUP.latestVersion(); v++) {
checkRequest(createSyncGroupRequest(v), true);
- checkErrorResponse(createSyncGroupRequest(v), new UnknownServerException(), true);
+ checkErrorResponse(createSyncGroupRequest(v), unknownServerException, true);
checkResponse(createSyncGroupResponse(v), v, true);
}
checkRequest(createLeaveGroupRequest(), true);
- checkErrorResponse(createLeaveGroupRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createLeaveGroupRequest(), unknownServerException, true);
checkResponse(createLeaveGroupResponse(), 0, true);
for (short v = ApiKeys.LIST_GROUPS.oldestVersion(); v <= ApiKeys.LIST_GROUPS.latestVersion(); v++) {
checkRequest(createListGroupsRequest(v), false);
- checkErrorResponse(createListGroupsRequest(v), new UnknownServerException(), true);
+ checkErrorResponse(createListGroupsRequest(v), unknownServerException, true);
checkResponse(createListGroupsResponse(v), v, true);
}
checkRequest(createDescribeGroupRequest(), true);
- checkErrorResponse(createDescribeGroupRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createDescribeGroupRequest(), unknownServerException, true);
checkResponse(createDescribeGroupResponse(), 0, true);
checkRequest(createDeleteGroupsRequest(), true);
- checkErrorResponse(createDeleteGroupsRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createDeleteGroupsRequest(), unknownServerException, true);
checkResponse(createDeleteGroupsResponse(), 0, true);
for (int i = 0; i < ApiKeys.LIST_OFFSETS.latestVersion(); i++) {
checkRequest(createListOffsetRequest(i), true);
- checkErrorResponse(createListOffsetRequest(i), new UnknownServerException(), true);
+ checkErrorResponse(createListOffsetRequest(i), unknownServerException, true);
checkResponse(createListOffsetResponse(i), i, true);
}
checkRequest(MetadataRequest.Builder.allTopics().build((short) 2), true);
checkRequest(createMetadataRequest(1, Collections.singletonList("topic1")), true);
- checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), new UnknownServerException(), true);
+ checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), unknownServerException, true);
checkResponse(createMetadataResponse(), 2, true);
- checkErrorResponse(createMetadataRequest(2, Collections.singletonList("topic1")), new UnknownServerException(), true);
+ checkErrorResponse(createMetadataRequest(2, Collections.singletonList("topic1")), unknownServerException, true);
checkResponse(createMetadataResponse(), 3, true);
- checkErrorResponse(createMetadataRequest(3, Collections.singletonList("topic1")), new UnknownServerException(), true);
+ checkErrorResponse(createMetadataRequest(3, Collections.singletonList("topic1")), unknownServerException, true);
checkResponse(createMetadataResponse(), 4, true);
- checkErrorResponse(createMetadataRequest(4, Collections.singletonList("topic1")), new UnknownServerException(), true);
+ checkErrorResponse(createMetadataRequest(4, Collections.singletonList("topic1")), unknownServerException, true);
checkRequest(createOffsetFetchRequestForAllPartition("group1", false), true);
checkRequest(createOffsetFetchRequestForAllPartition("group1", true), true);
checkErrorResponse(createOffsetFetchRequestForAllPartition("group1", false), new NotCoordinatorException("Not Coordinator"), true);
@@ -268,42 +274,42 @@ public class RequestResponseTest {
checkRequest(createOffsetFetchRequest(7, true), true);
checkRequest(createOffsetFetchRequestForAllPartition("group1", false), true);
checkRequest(createOffsetFetchRequestForAllPartition("group1", true), true);
- checkErrorResponse(createOffsetFetchRequest(0, false), new UnknownServerException(), true);
- checkErrorResponse(createOffsetFetchRequest(1, false), new UnknownServerException(), true);
- checkErrorResponse(createOffsetFetchRequest(2, false), new UnknownServerException(), true);
- checkErrorResponse(createOffsetFetchRequest(7, true), new UnknownServerException(), true);
+ checkErrorResponse(createOffsetFetchRequest(0, false), unknownServerException, true);
+ checkErrorResponse(createOffsetFetchRequest(1, false), unknownServerException, true);
+ checkErrorResponse(createOffsetFetchRequest(2, false), unknownServerException, true);
+ checkErrorResponse(createOffsetFetchRequest(7, true), unknownServerException, true);
checkResponse(createOffsetFetchResponse(), 0, true);
checkRequest(createProduceRequest(2), true);
- checkErrorResponse(createProduceRequest(2), new UnknownServerException(), true);
+ checkErrorResponse(createProduceRequest(2), unknownServerException, true);
checkRequest(createProduceRequest(3), true);
- checkErrorResponse(createProduceRequest(3), new UnknownServerException(), true);
+ checkErrorResponse(createProduceRequest(3), unknownServerException, true);
checkResponse(createProduceResponse(), 2, true);
checkResponse(createProduceResponseWithErrorMessage(), 8, true);
for (int v = ApiKeys.STOP_REPLICA.oldestVersion(); v <= ApiKeys.STOP_REPLICA.latestVersion(); v++) {
checkRequest(createStopReplicaRequest(v, true), true);
checkRequest(createStopReplicaRequest(v, false), true);
- checkErrorResponse(createStopReplicaRequest(v, true), new UnknownServerException(), true);
- checkErrorResponse(createStopReplicaRequest(v, false), new UnknownServerException(), true);
+ checkErrorResponse(createStopReplicaRequest(v, true), unknownServerException, true);
+ checkErrorResponse(createStopReplicaRequest(v, false), unknownServerException, true);
checkResponse(createStopReplicaResponse(), v, true);
}
checkRequest(createLeaderAndIsrRequest(0), true);
- checkErrorResponse(createLeaderAndIsrRequest(0), new UnknownServerException(), false);
+ checkErrorResponse(createLeaderAndIsrRequest(0), unknownServerException, false);
checkRequest(createLeaderAndIsrRequest(1), true);
- checkErrorResponse(createLeaderAndIsrRequest(1), new UnknownServerException(), false);
+ checkErrorResponse(createLeaderAndIsrRequest(1), unknownServerException, false);
checkRequest(createLeaderAndIsrRequest(2), true);
- checkErrorResponse(createLeaderAndIsrRequest(2), new UnknownServerException(), false);
+ checkErrorResponse(createLeaderAndIsrRequest(2), unknownServerException, false);
checkResponse(createLeaderAndIsrResponse(), 0, true);
checkRequest(createSaslHandshakeRequest(), true);
- checkErrorResponse(createSaslHandshakeRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createSaslHandshakeRequest(), unknownServerException, true);
checkResponse(createSaslHandshakeResponse(), 0, true);
checkRequest(createSaslAuthenticateRequest(), true);
- checkErrorResponse(createSaslAuthenticateRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createSaslAuthenticateRequest(), unknownServerException, true);
checkResponse(createSaslAuthenticateResponse(), 0, true);
checkResponse(createSaslAuthenticateResponse(), 1, true);
checkRequest(createApiVersionRequest(), true);
- checkErrorResponse(createApiVersionRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createApiVersionRequest(), unknownServerException, true);
checkErrorResponse(createApiVersionRequest(), new UnsupportedVersionException("Not Supported"), true);
checkResponse(createApiVersionResponse(), 0, true);
checkResponse(createApiVersionResponse(), 1, true);
@@ -315,107 +321,107 @@ public class RequestResponseTest {
checkResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE, 3, true);
checkRequest(createCreateTopicRequest(0), true);
- checkErrorResponse(createCreateTopicRequest(0), new UnknownServerException(), true);
+ checkErrorResponse(createCreateTopicRequest(0), unknownServerException, true);
checkResponse(createCreateTopicResponse(), 0, true);
checkRequest(createCreateTopicRequest(1), true);
- checkErrorResponse(createCreateTopicRequest(1), new UnknownServerException(), true);
+ checkErrorResponse(createCreateTopicRequest(1), unknownServerException, true);
checkResponse(createCreateTopicResponse(), 1, true);
checkRequest(createCreateTopicRequest(2), true);
- checkErrorResponse(createCreateTopicRequest(2), new UnknownServerException(), true);
+ checkErrorResponse(createCreateTopicRequest(2), unknownServerException, true);
checkResponse(createCreateTopicResponse(), 2, true);
checkRequest(createCreateTopicRequest(3), true);
- checkErrorResponse(createCreateTopicRequest(3), new UnknownServerException(), true);
+ checkErrorResponse(createCreateTopicRequest(3), unknownServerException, true);
checkResponse(createCreateTopicResponse(), 3, true);
checkRequest(createCreateTopicRequest(4), true);
- checkErrorResponse(createCreateTopicRequest(4), new UnknownServerException(), true);
+ checkErrorResponse(createCreateTopicRequest(4), unknownServerException, true);
checkResponse(createCreateTopicResponse(), 4, true);
checkRequest(createCreateTopicRequest(5), true);
- checkErrorResponse(createCreateTopicRequest(5), new UnknownServerException(), true);
+ checkErrorResponse(createCreateTopicRequest(5), unknownServerException, true);
checkResponse(createCreateTopicResponse(), 5, true);
checkRequest(createDeleteTopicsRequest(), true);
- checkErrorResponse(createDeleteTopicsRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createDeleteTopicsRequest(), unknownServerException, true);
checkResponse(createDeleteTopicsResponse(), 0, true);
checkRequest(createInitPidRequest(), true);
- checkErrorResponse(createInitPidRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createInitPidRequest(), unknownServerException, true);
checkResponse(createInitPidResponse(), 0, true);
checkRequest(createAddPartitionsToTxnRequest(), true);
checkResponse(createAddPartitionsToTxnResponse(), 0, true);
- checkErrorResponse(createAddPartitionsToTxnRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createAddPartitionsToTxnRequest(), unknownServerException, true);
checkRequest(createAddOffsetsToTxnRequest(), true);
checkResponse(createAddOffsetsToTxnResponse(), 0, true);
- checkErrorResponse(createAddOffsetsToTxnRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createAddOffsetsToTxnRequest(), unknownServerException, true);
checkRequest(createEndTxnRequest(), true);
checkResponse(createEndTxnResponse(), 0, true);
- checkErrorResponse(createEndTxnRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createEndTxnRequest(), unknownServerException, true);
checkRequest(createWriteTxnMarkersRequest(), true);
checkResponse(createWriteTxnMarkersResponse(), 0, true);
- checkErrorResponse(createWriteTxnMarkersRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createWriteTxnMarkersRequest(), unknownServerException, true);
checkOlderFetchVersions();
checkResponse(createMetadataResponse(), 0, true);
checkResponse(createMetadataResponse(), 1, true);
- checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), new UnknownServerException(), true);
+ checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), unknownServerException, true);
checkRequest(createOffsetCommitRequest(0), true);
- checkErrorResponse(createOffsetCommitRequest(0), new UnknownServerException(), true);
+ checkErrorResponse(createOffsetCommitRequest(0), unknownServerException, true);
checkRequest(createOffsetCommitRequest(1), true);
- checkErrorResponse(createOffsetCommitRequest(1), new UnknownServerException(), true);
+ checkErrorResponse(createOffsetCommitRequest(1), unknownServerException, true);
checkRequest(createOffsetCommitRequest(2), true);
- checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException(), true);
+ checkErrorResponse(createOffsetCommitRequest(2), unknownServerException, true);
checkRequest(createOffsetCommitRequest(3), true);
- checkErrorResponse(createOffsetCommitRequest(3), new UnknownServerException(), true);
+ checkErrorResponse(createOffsetCommitRequest(3), unknownServerException, true);
checkRequest(createOffsetCommitRequest(4), true);
- checkErrorResponse(createOffsetCommitRequest(4), new UnknownServerException(), true);
+ checkErrorResponse(createOffsetCommitRequest(4), unknownServerException, true);
checkResponse(createOffsetCommitResponse(), 4, true);
checkRequest(createOffsetCommitRequest(5), true);
- checkErrorResponse(createOffsetCommitRequest(5), new UnknownServerException(), true);
+ checkErrorResponse(createOffsetCommitRequest(5), unknownServerException, true);
checkResponse(createOffsetCommitResponse(), 5, true);
checkRequest(createJoinGroupRequest(0), true);
checkRequest(createUpdateMetadataRequest(0, null), false);
- checkErrorResponse(createUpdateMetadataRequest(0, null), new UnknownServerException(), true);
+ checkErrorResponse(createUpdateMetadataRequest(0, null), unknownServerException, true);
checkRequest(createUpdateMetadataRequest(1, null), false);
checkRequest(createUpdateMetadataRequest(1, "rack1"), false);
- checkErrorResponse(createUpdateMetadataRequest(1, null), new UnknownServerException(), true);
+ checkErrorResponse(createUpdateMetadataRequest(1, null), unknownServerException, true);
checkRequest(createUpdateMetadataRequest(2, "rack1"), false);
checkRequest(createUpdateMetadataRequest(2, null), false);
- checkErrorResponse(createUpdateMetadataRequest(2, "rack1"), new UnknownServerException(), true);
+ checkErrorResponse(createUpdateMetadataRequest(2, "rack1"), unknownServerException, true);
checkRequest(createUpdateMetadataRequest(3, "rack1"), false);
checkRequest(createUpdateMetadataRequest(3, null), false);
- checkErrorResponse(createUpdateMetadataRequest(3, "rack1"), new UnknownServerException(), true);
+ checkErrorResponse(createUpdateMetadataRequest(3, "rack1"), unknownServerException, true);
checkRequest(createUpdateMetadataRequest(4, "rack1"), false);
checkRequest(createUpdateMetadataRequest(4, null), false);
- checkErrorResponse(createUpdateMetadataRequest(4, "rack1"), new UnknownServerException(), true);
+ checkErrorResponse(createUpdateMetadataRequest(4, "rack1"), unknownServerException, true);
checkRequest(createUpdateMetadataRequest(5, "rack1"), false);
checkRequest(createUpdateMetadataRequest(5, null), false);
- checkErrorResponse(createUpdateMetadataRequest(5, "rack1"), new UnknownServerException(), true);
+ checkErrorResponse(createUpdateMetadataRequest(5, "rack1"), unknownServerException, true);
checkResponse(createUpdateMetadataResponse(), 0, true);
checkRequest(createListOffsetRequest(0), true);
- checkErrorResponse(createListOffsetRequest(0), new UnknownServerException(), true);
+ checkErrorResponse(createListOffsetRequest(0), unknownServerException, true);
checkResponse(createListOffsetResponse(0), 0, true);
checkRequest(createLeaderEpochRequestForReplica(0, 1), true);
checkRequest(createLeaderEpochRequestForConsumer(), true);
checkResponse(createLeaderEpochResponse(), 0, true);
- checkErrorResponse(createLeaderEpochRequestForConsumer(), new UnknownServerException(), true);
+ checkErrorResponse(createLeaderEpochRequestForConsumer(), unknownServerException, true);
checkRequest(createAddPartitionsToTxnRequest(), true);
- checkErrorResponse(createAddPartitionsToTxnRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createAddPartitionsToTxnRequest(), unknownServerException, true);
checkResponse(createAddPartitionsToTxnResponse(), 0, true);
checkRequest(createAddOffsetsToTxnRequest(), true);
- checkErrorResponse(createAddOffsetsToTxnRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createAddOffsetsToTxnRequest(), unknownServerException, true);
checkResponse(createAddOffsetsToTxnResponse(), 0, true);
checkRequest(createEndTxnRequest(), true);
- checkErrorResponse(createEndTxnRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createEndTxnRequest(), unknownServerException, true);
checkResponse(createEndTxnResponse(), 0, true);
checkRequest(createWriteTxnMarkersRequest(), true);
- checkErrorResponse(createWriteTxnMarkersRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createWriteTxnMarkersRequest(), unknownServerException, true);
checkResponse(createWriteTxnMarkersResponse(), 0, true);
checkRequest(createTxnOffsetCommitRequest(0), true);
checkRequest(createTxnOffsetCommitRequest(3), true);
checkRequest(createTxnOffsetCommitRequestWithAutoDowngrade(2), true);
- checkErrorResponse(createTxnOffsetCommitRequest(0), new UnknownServerException(), true);
- checkErrorResponse(createTxnOffsetCommitRequest(3), new UnknownServerException(), true);
- checkErrorResponse(createTxnOffsetCommitRequestWithAutoDowngrade(2), new UnknownServerException(), true);
+ checkErrorResponse(createTxnOffsetCommitRequest(0), unknownServerException, true);
+ checkErrorResponse(createTxnOffsetCommitRequest(3), unknownServerException, true);
+ checkErrorResponse(createTxnOffsetCommitRequestWithAutoDowngrade(2), unknownServerException, true);
checkResponse(createTxnOffsetCommitResponse(), 0, true);
checkRequest(createDescribeAclsRequest(), true);
checkErrorResponse(createDescribeAclsRequest(), new SecurityDisabledException("Security is not enabled."), true);
@@ -427,18 +433,18 @@ public class RequestResponseTest {
checkErrorResponse(createDeleteAclsRequest(), new SecurityDisabledException("Security is not enabled."), true);
checkResponse(createDeleteAclsResponse(), ApiKeys.DELETE_ACLS.latestVersion(), true);
checkRequest(createAlterConfigsRequest(), false);
- checkErrorResponse(createAlterConfigsRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createAlterConfigsRequest(), unknownServerException, true);
checkResponse(createAlterConfigsResponse(), 0, false);
checkRequest(createDescribeConfigsRequest(0), true);
checkRequest(createDescribeConfigsRequestWithConfigEntries(0), false);
- checkErrorResponse(createDescribeConfigsRequest(0), new UnknownServerException(), true);
+ checkErrorResponse(createDescribeConfigsRequest(0), unknownServerException, true);
checkResponse(createDescribeConfigsResponse(), 0, false);
checkRequest(createDescribeConfigsRequest(1), true);
checkRequest(createDescribeConfigsRequestWithConfigEntries(1), false);
checkRequest(createDescribeConfigsRequestWithDocumentation(1), false);
checkRequest(createDescribeConfigsRequestWithDocumentation(2), false);
checkRequest(createDescribeConfigsRequestWithDocumentation(3), false);
- checkErrorResponse(createDescribeConfigsRequest(1), new UnknownServerException(), true);
+ checkErrorResponse(createDescribeConfigsRequest(1), unknownServerException, true);
checkResponse(createDescribeConfigsResponse(), 1, false);
checkDescribeConfigsResponseVersions();
checkRequest(createCreatePartitionsRequest(), true);
@@ -446,33 +452,40 @@ public class RequestResponseTest {
checkErrorResponse(createCreatePartitionsRequest(), new InvalidTopicException(), true);
checkResponse(createCreatePartitionsResponse(), 0, true);
checkRequest(createCreateTokenRequest(), true);
- checkErrorResponse(createCreateTokenRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createCreateTokenRequest(), unknownServerException, true);
checkResponse(createCreateTokenResponse(), 0, true);
checkRequest(createDescribeTokenRequest(), true);
- checkErrorResponse(createDescribeTokenRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createDescribeTokenRequest(), unknownServerException, true);
checkResponse(createDescribeTokenResponse(), 0, true);
checkRequest(createExpireTokenRequest(), true);
- checkErrorResponse(createExpireTokenRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createExpireTokenRequest(), unknownServerException, true);
checkResponse(createExpireTokenResponse(), 0, true);
checkRequest(createRenewTokenRequest(), true);
- checkErrorResponse(createRenewTokenRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createRenewTokenRequest(), unknownServerException, true);
checkResponse(createRenewTokenResponse(), 0, true);
checkRequest(createElectLeadersRequest(), true);
checkRequest(createElectLeadersRequestNullPartitions(), true);
- checkErrorResponse(createElectLeadersRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createElectLeadersRequest(), unknownServerException, true);
checkResponse(createElectLeadersResponse(), 1, true);
checkRequest(createIncrementalAlterConfigsRequest(), true);
- checkErrorResponse(createIncrementalAlterConfigsRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createIncrementalAlterConfigsRequest(), unknownServerException, true);
checkResponse(createIncrementalAlterConfigsResponse(), 0, true);
checkRequest(createAlterPartitionReassignmentsRequest(), true);
- checkErrorResponse(createAlterPartitionReassignmentsRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createAlterPartitionReassignmentsRequest(), unknownServerException, true);
checkResponse(createAlterPartitionReassignmentsResponse(), 0, true);
checkRequest(createListPartitionReassignmentsRequest(), true);
- checkErrorResponse(createListPartitionReassignmentsRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createListPartitionReassignmentsRequest(), unknownServerException, true);
checkResponse(createListPartitionReassignmentsResponse(), 0, true);
checkRequest(createOffsetDeleteRequest(), true);
- checkErrorResponse(createOffsetDeleteRequest(), new UnknownServerException(), true);
+ checkErrorResponse(createOffsetDeleteRequest(), unknownServerException, true);
checkResponse(createOffsetDeleteResponse(), 0, true);
+
+ checkRequest(createDescribeClientQuotasRequest(), true);
+ checkErrorResponse(createDescribeClientQuotasRequest(), unknownServerException, true);
+ checkResponse(createDescribeClientQuotasResponse(), 0, true);
+ checkRequest(createAlterClientQuotasRequest(), true);
+ checkErrorResponse(createAlterClientQuotasRequest(), unknownServerException, true);
+ checkResponse(createAlterClientQuotasResponse(), 0, true);
}
@Test
@@ -486,7 +499,7 @@ public class RequestResponseTest {
private void checkOlderFetchVersions() throws Exception {
int latestVersion = FETCH.latestVersion();
for (int i = 0; i < latestVersion; ++i) {
- checkErrorResponse(createFetchRequest(i), new UnknownServerException(), true);
+ checkErrorResponse(createFetchRequest(i), unknownServerException, true);
checkRequest(createFetchRequest(i), true);
checkResponse(createFetchResponse(), i, true);
}
@@ -532,7 +545,13 @@ public class RequestResponseTest {
}
private void checkErrorResponse(AbstractRequest req, Throwable e, boolean checkEqualityAndHashCode) {
- checkResponse(req.getErrorResponse(e), req.version(), checkEqualityAndHashCode);
+ AbstractResponse response = req.getErrorResponse(e);
+ checkResponse(response, req.version(), checkEqualityAndHashCode);
+ if (e instanceof UnknownServerException) {
+ String responseStr = response.toStruct(req.version()).toString();
+ assertFalse(String.format("Unknown message included in response for %s: %s ", req.api, responseStr),
+ responseStr.contains(e.getMessage()));
+ }
}
private void checkRequest(AbstractRequest req, boolean checkEqualityAndHashCode) {
@@ -2234,4 +2253,25 @@ public class RequestResponseTest {
return new OffsetDeleteResponse(data);
}
+ private DescribeClientQuotasRequest createDescribeClientQuotasRequest() {
+ ClientQuotaFilter filter = ClientQuotaFilter.all();
+ return new DescribeClientQuotasRequest.Builder(filter).build((short) 0);
+ }
+
+ private DescribeClientQuotasResponse createDescribeClientQuotasResponse() {
+ ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"));
+ return new DescribeClientQuotasResponse(Collections.singletonMap(entity, Collections.singletonMap("request_percentage", 1.0)), 0);
+ }
+
+ private AlterClientQuotasRequest createAlterClientQuotasRequest() {
+ ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"));
+ ClientQuotaAlteration.Op op = new ClientQuotaAlteration.Op("request_percentage", 2.0);
+ ClientQuotaAlteration alteration = new ClientQuotaAlteration(entity, Collections.singleton(op));
+ return new AlterClientQuotasRequest.Builder(Collections.singleton(alteration), false).build((short) 0);
+ }
+
+ private AlterClientQuotasResponse createAlterClientQuotasResponse() {
+ ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"));
+ return new AlterClientQuotasResponse(Collections.singletonMap(entity, ApiError.NONE), 0);
+ }
}
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index fdc83fd..3f968ef 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -23,6 +23,7 @@ import java.util.{Collections, Properties}
import joptsimple._
import kafka.common.Config
import kafka.log.LogConfig
+import kafka.server.DynamicConfig.QuotaConfigs
import kafka.server.{ConfigEntityName, ConfigType, Defaults, DynamicBrokerConfig, DynamicConfig, KafkaConfig}
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncoder}
import kafka.utils.Implicits._
@@ -364,6 +365,14 @@ object ConfigCommand extends Config {
adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
case ConfigType.User | ConfigType.Client =>
+ val nonQuotaConfigsToAdd = configsToBeAdded.keys.filterNot(QuotaConfigs.isQuotaConfig)
+ if (nonQuotaConfigsToAdd.nonEmpty)
+ throw new IllegalArgumentException(s"Only quota configs can be added for '$entityTypeHead' using --bootstrap-server. Unexpected config names: $nonQuotaConfigsToAdd")
+ val nonQuotaConfigsToDelete = configsToBeDeleted.filterNot(QuotaConfigs.isQuotaConfig)
+ if (nonQuotaConfigsToDelete.nonEmpty)
+ throw new IllegalArgumentException(s"Only quota configs can be deleted for '$entityTypeHead' using --bootstrap-server. Unexpected config names: $nonQuotaConfigsToDelete")
+
+
val oldConfig = getClientQuotasConfig(adminClient, entityTypes, entityNames)
val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 183a5d3..5fc411d 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -23,6 +23,7 @@ import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.log.LogConfig
import kafka.utils.Log4jController
import kafka.metrics.KafkaMetricsGroup
+import kafka.server.DynamicConfig.QuotaConfigs
import kafka.utils._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.AlterConfigOp
@@ -864,19 +865,20 @@ class AdminManager(val config: KafkaConfig,
!name.isDefined || !strict
}
- def fromProps(props: Properties): Map[String, Double] = {
- props.asScala.map { case (key, value) =>
+ def fromProps(props: Map[String, String]): Map[String, Double] = {
+ props.map { case (key, value) =>
val doubleValue = try value.toDouble catch {
case _: NumberFormatException =>
- throw new IllegalStateException(s"Unexpected client quota configuration value: ${key} -> ${value}")
+ throw new IllegalStateException(s"Unexpected client quota configuration value: $key -> $value")
}
(key -> doubleValue)
}
}
(userEntries ++ clientIdEntries ++ bothEntries).map { case ((u, c), p) =>
- if (!p.isEmpty && matches(userComponent, u) && matches(clientIdComponent, c))
- Some((userClientIdToEntity(u, c) -> fromProps(p)))
+ val quotaProps = p.asScala.filter { case (key, _) => QuotaConfigs.isQuotaConfig(key) }
+ if (quotaProps.nonEmpty && matches(userComponent, u) && matches(clientIdComponent, c))
+ Some(userClientIdToEntity(u, c) -> fromProps(quotaProps))
else
None
}.flatten.toMap
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index 13c64bf..f3d40a1 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -67,11 +67,20 @@ object DynamicConfig {
def validate(props: Properties) = DynamicConfig.validate(brokerConfigDef, props, customPropsAllowed = true)
}
- object Client {
- //Properties
+ object QuotaConfigs {
val ProducerByteRateOverrideProp = "producer_byte_rate"
val ConsumerByteRateOverrideProp = "consumer_byte_rate"
val RequestPercentageOverrideProp = "request_percentage"
+ private val configNames = Set(ProducerByteRateOverrideProp, ConsumerByteRateOverrideProp, RequestPercentageOverrideProp)
+
+ def isQuotaConfig(name: String): Boolean = configNames.contains(name)
+ }
+
+ object Client {
+ //Properties
+ val ProducerByteRateOverrideProp = QuotaConfigs.ProducerByteRateOverrideProp
+ val ConsumerByteRateOverrideProp = QuotaConfigs.ConsumerByteRateOverrideProp
+ val RequestPercentageOverrideProp = QuotaConfigs.RequestPercentageOverrideProp
//Defaults
val DefaultProducerOverride = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 2b357b1..d412123 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -486,6 +486,27 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
}
@Test
+ def shouldNotAlterNonQuotaClientConfigUsingBootstrapServer(): Unit = {
+ val node = new Node(1, "localhost", 9092)
+ val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node)
+
+ def verifyCommand(entityType: String, alterOpts: String*): Unit = {
+ val opts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
+ "--entity-type", entityType, "--entity-name", "admin",
+ "--alter") ++ alterOpts)
+ val e = intercept[IllegalArgumentException] {
+ ConfigCommand.alterConfig(mockAdminClient, opts)
+ }
+ assertTrue(s"Unexpected exception: $e", e.getMessage.contains("some_config"))
+ }
+
+ verifyCommand("users", "--add-config", "consumer_byte_rate=20000,producer_byte_rate=10000,some_config=10")
+ verifyCommand("clients", "--add-config", "some_config=10")
+ verifyCommand("users", "--delete-config", "consumer_byte_rate=20000,some_config=10")
+ verifyCommand("clients", "--delete-config", "some_config=10")
+ }
+
+ @Test
def shouldAddTopicConfigUsingZookeeper(): Unit = {
val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
"--entity-name", "my-topic",
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
index b5c694a..6ddfd3b 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -23,9 +23,10 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse}
import org.junit.Assert._
import org.junit.Test
-
import java.util.concurrent.{ExecutionException, TimeUnit}
+import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
+
import scala.jdk.CollectionConverters._
class ClientQuotasRequestTest extends BaseRequestTest {
@@ -37,6 +38,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
@Test
def testAlterClientQuotasRequest(): Unit = {
+
val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user"), (ClientQuotaEntity.CLIENT_ID -> "client-id")).asJava)
// Expect an empty configuration.
@@ -162,6 +164,32 @@ class ClientQuotasRequestTest extends BaseRequestTest {
))
}
+ @Test
+ def testClientQuotasForScramUsers(): Unit = {
+ val entityType = ConfigType.User
+ val userName = "user"
+
+ val mechanism = ScramMechanism.SCRAM_SHA_256
+ val credential = new ScramFormatter(mechanism).generateCredential("password", 4096)
+ val configs = adminZkClient.fetchEntityConfig(entityType, userName)
+ configs.setProperty(mechanism.mechanismName, ScramCredentialUtils.credentialToString(credential))
+ adminZkClient.changeConfigs(entityType, userName, configs)
+
+ val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> userName).asJava)
+
+ verifyDescribeEntityQuotas(entity, Map.empty)
+
+ alterEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> Some(10000.0)),
+ (ConsumerByteRateProp -> Some(20000.0))
+ ), validateOnly = false)
+
+ verifyDescribeEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> 10000.0),
+ (ConsumerByteRateProp -> 20000.0)
+ ))
+ }
+
@Test(expected = classOf[InvalidRequestException])
def testAlterClientQuotasBadUser(): Unit = {
val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "")).asJava)