You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/07/17 08:26:17 UTC

[kafka] branch 2.6 updated: MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server (#9030)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new dd71437  MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server (#9030)
dd71437 is described below

commit dd71437de7675d92ad3e4ed01ac3ee11bf5da99d
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Jul 17 08:55:53 2020 +0100

    MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server (#9030)
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>, David Jacot <dj...@confluent.io>, Ron Dagostino <rd...@confluent.io>
---
 .../common/requests/AlterClientQuotasResponse.java |  13 +-
 .../org/apache/kafka/common/requests/ApiError.java |   5 +-
 .../requests/DescribeClientQuotasResponse.java     |  11 +-
 .../common/requests/SaslAuthenticateRequest.java   |   5 +-
 .../kafka/common/requests/RequestResponseTest.java | 198 +++++++++++++--------
 .../src/main/scala/kafka/admin/ConfigCommand.scala |   9 +
 .../src/main/scala/kafka/server/AdminManager.scala |  12 +-
 .../main/scala/kafka/server/DynamicConfig.scala    |  13 +-
 .../scala/unit/kafka/admin/ConfigCommandTest.scala |  21 +++
 .../kafka/server/ClientQuotasRequestTest.scala     |  30 +++-
 10 files changed, 220 insertions(+), 97 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java
index a3b01bf..3e95671 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java
@@ -20,10 +20,12 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.message.AlterClientQuotasResponseData;
 import org.apache.kafka.common.message.AlterClientQuotasResponseData.EntityData;
 import org.apache.kafka.common.message.AlterClientQuotasResponseData.EntryData;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -50,14 +52,13 @@ public class AlterClientQuotasResponse extends AbstractResponse {
     }
 
     public AlterClientQuotasResponse(Collection<ClientQuotaEntity> entities, int throttleTimeMs, Throwable e) {
-        short errorCode = Errors.forException(e).code();
-        String errorMessage = e.getMessage();
+        ApiError apiError = ApiError.fromThrowable(e);
 
         List<EntryData> entries = new ArrayList<>(entities.size());
         for (ClientQuotaEntity entity : entities) {
             entries.add(new EntryData()
-                    .setErrorCode(errorCode)
-                    .setErrorMessage(errorMessage)
+                    .setErrorCode(apiError.error().code())
+                    .setErrorMessage(apiError.message())
                     .setEntity(toEntityData(entity)));
         }
 
@@ -120,4 +121,8 @@ public class AlterClientQuotasResponse extends AbstractResponse {
         }
         return entityData;
     }
+
+    public static AlterClientQuotasResponse parse(ByteBuffer buffer, short version) {
+        return new AlterClientQuotasResponse(ApiKeys.ALTER_CLIENT_QUOTAS.parseResponse(version, buffer), version);
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
index 6cb09f0..5c9ca7b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
@@ -38,9 +38,10 @@ public class ApiError {
     private final String message;
 
     public static ApiError fromThrowable(Throwable t) {
-        // Avoid populating the error message if it's a generic one
+        // Avoid populating the error message if it's a generic one. Also don't populate error
+        // message for UNKNOWN_SERVER_ERROR to ensure we don't leak sensitive information.
         Errors error = Errors.forException(t);
-        String message = error.message().equals(t.getMessage()) ? null : t.getMessage();
+        String message = error == Errors.UNKNOWN_SERVER_ERROR || error.message().equals(t.getMessage()) ? null : t.getMessage();
         return new ApiError(error, message);
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java
index cb54b01..bda3673 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java
@@ -21,10 +21,12 @@ import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
 import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntityData;
 import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryData;
 import org.apache.kafka.common.message.DescribeClientQuotasResponseData.ValueData;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -66,10 +68,11 @@ public class DescribeClientQuotasResponse extends AbstractResponse {
     }
 
     public DescribeClientQuotasResponse(int throttleTimeMs, Throwable e) {
+        ApiError apiError = ApiError.fromThrowable(e);
         this.data = new DescribeClientQuotasResponseData()
                 .setThrottleTimeMs(throttleTimeMs)
-                .setErrorCode(Errors.forException(e).code())
-                .setErrorMessage(e.getMessage())
+                .setErrorCode(apiError.error().code())
+                .setErrorMessage(apiError.message())
                 .setEntries(null);
     }
 
@@ -115,4 +118,8 @@ public class DescribeClientQuotasResponse extends AbstractResponse {
     protected Struct toStruct(short version) {
         return data.toStruct(version);
     }
+
+    public static DescribeClientQuotasResponse parse(ByteBuffer buffer, short version) {
+        return new DescribeClientQuotasResponse(ApiKeys.DESCRIBE_CLIENT_QUOTAS.parseResponse(version, buffer), version);
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
index 0ed5125..5fdfbec 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
@@ -77,9 +77,10 @@ public class SaslAuthenticateRequest extends AbstractRequest {
 
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        ApiError apiError = ApiError.fromThrowable(e);
         SaslAuthenticateResponseData response = new SaslAuthenticateResponseData()
-                .setErrorCode(ApiError.fromThrowable(e).error().code())
-                .setErrorMessage(e.getMessage());
+                .setErrorCode(apiError.error().code())
+                .setErrorMessage(apiError.message());
         return new SaslAuthenticateResponse(response);
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c508dbc..50775b6 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -139,6 +139,9 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.RecordBatch;
@@ -190,18 +193,21 @@ import static org.junit.Assert.fail;
 
 public class RequestResponseTest {
 
+    // Exception includes a message that we verify is not included in error responses
+    private final UnknownServerException unknownServerException = new UnknownServerException("secret");
+
     @Test
     public void testSerialization() throws Exception {
         checkRequest(createFindCoordinatorRequest(0), true);
         checkRequest(createFindCoordinatorRequest(1), true);
-        checkErrorResponse(createFindCoordinatorRequest(0), new UnknownServerException(), true);
-        checkErrorResponse(createFindCoordinatorRequest(1), new UnknownServerException(), true);
+        checkErrorResponse(createFindCoordinatorRequest(0), unknownServerException, true);
+        checkErrorResponse(createFindCoordinatorRequest(1), unknownServerException, true);
         checkResponse(createFindCoordinatorResponse(), 0, true);
         checkResponse(createFindCoordinatorResponse(), 1, true);
         checkRequest(createControlledShutdownRequest(), true);
         checkResponse(createControlledShutdownResponse(), 1, true);
-        checkErrorResponse(createControlledShutdownRequest(), new UnknownServerException(), true);
-        checkErrorResponse(createControlledShutdownRequest(0), new UnknownServerException(), true);
+        checkErrorResponse(createControlledShutdownRequest(), unknownServerException, true);
+        checkErrorResponse(createControlledShutdownRequest(0), unknownServerException, true);
         checkRequest(createFetchRequest(4), true);
         checkResponse(createFetchResponse(), 4, true);
         List<TopicPartition> toForgetTopics = new ArrayList<>();
@@ -211,53 +217,53 @@ public class RequestResponseTest {
         checkRequest(createFetchRequest(7, new FetchMetadata(123, 456), toForgetTopics), true);
         checkResponse(createFetchResponse(123), 7, true);
         checkResponse(createFetchResponse(Errors.FETCH_SESSION_ID_NOT_FOUND, 123), 7, true);
-        checkErrorResponse(createFetchRequest(4), new UnknownServerException(), true);
+        checkErrorResponse(createFetchRequest(4), unknownServerException, true);
         checkRequest(createHeartBeatRequest(), true);
-        checkErrorResponse(createHeartBeatRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createHeartBeatRequest(), unknownServerException, true);
         checkResponse(createHeartBeatResponse(), 0, true);
 
         for (int v = ApiKeys.JOIN_GROUP.oldestVersion(); v <= ApiKeys.JOIN_GROUP.latestVersion(); v++) {
             checkRequest(createJoinGroupRequest(v), true);
-            checkErrorResponse(createJoinGroupRequest(v), new UnknownServerException(), true);
+            checkErrorResponse(createJoinGroupRequest(v), unknownServerException, true);
             checkResponse(createJoinGroupResponse(v), v, true);
         }
 
         for (int v = ApiKeys.SYNC_GROUP.oldestVersion(); v <= ApiKeys.SYNC_GROUP.latestVersion(); v++) {
             checkRequest(createSyncGroupRequest(v), true);
-            checkErrorResponse(createSyncGroupRequest(v), new UnknownServerException(), true);
+            checkErrorResponse(createSyncGroupRequest(v), unknownServerException, true);
             checkResponse(createSyncGroupResponse(v), v, true);
         }
 
         checkRequest(createLeaveGroupRequest(), true);
-        checkErrorResponse(createLeaveGroupRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createLeaveGroupRequest(), unknownServerException, true);
         checkResponse(createLeaveGroupResponse(), 0, true);
 
         for (short v = ApiKeys.LIST_GROUPS.oldestVersion(); v <= ApiKeys.LIST_GROUPS.latestVersion(); v++) {
             checkRequest(createListGroupsRequest(v), false);
-            checkErrorResponse(createListGroupsRequest(v), new UnknownServerException(), true);
+            checkErrorResponse(createListGroupsRequest(v), unknownServerException, true);
             checkResponse(createListGroupsResponse(v), v, true);
         }
 
         checkRequest(createDescribeGroupRequest(), true);
-        checkErrorResponse(createDescribeGroupRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createDescribeGroupRequest(), unknownServerException, true);
         checkResponse(createDescribeGroupResponse(), 0, true);
         checkRequest(createDeleteGroupsRequest(), true);
-        checkErrorResponse(createDeleteGroupsRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createDeleteGroupsRequest(), unknownServerException, true);
         checkResponse(createDeleteGroupsResponse(), 0, true);
         for (int i = 0; i < ApiKeys.LIST_OFFSETS.latestVersion(); i++) {
             checkRequest(createListOffsetRequest(i), true);
-            checkErrorResponse(createListOffsetRequest(i), new UnknownServerException(), true);
+            checkErrorResponse(createListOffsetRequest(i), unknownServerException, true);
             checkResponse(createListOffsetResponse(i), i, true);
         }
         checkRequest(MetadataRequest.Builder.allTopics().build((short) 2), true);
         checkRequest(createMetadataRequest(1, Collections.singletonList("topic1")), true);
-        checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), new UnknownServerException(), true);
+        checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), unknownServerException, true);
         checkResponse(createMetadataResponse(), 2, true);
-        checkErrorResponse(createMetadataRequest(2, Collections.singletonList("topic1")), new UnknownServerException(), true);
+        checkErrorResponse(createMetadataRequest(2, Collections.singletonList("topic1")), unknownServerException, true);
         checkResponse(createMetadataResponse(), 3, true);
-        checkErrorResponse(createMetadataRequest(3, Collections.singletonList("topic1")), new UnknownServerException(), true);
+        checkErrorResponse(createMetadataRequest(3, Collections.singletonList("topic1")), unknownServerException, true);
         checkResponse(createMetadataResponse(), 4, true);
-        checkErrorResponse(createMetadataRequest(4, Collections.singletonList("topic1")), new UnknownServerException(), true);
+        checkErrorResponse(createMetadataRequest(4, Collections.singletonList("topic1")), unknownServerException, true);
         checkRequest(createOffsetFetchRequestForAllPartition("group1", false), true);
         checkRequest(createOffsetFetchRequestForAllPartition("group1", true), true);
         checkErrorResponse(createOffsetFetchRequestForAllPartition("group1", false), new NotCoordinatorException("Not Coordinator"), true);
@@ -268,42 +274,42 @@ public class RequestResponseTest {
         checkRequest(createOffsetFetchRequest(7, true), true);
         checkRequest(createOffsetFetchRequestForAllPartition("group1", false), true);
         checkRequest(createOffsetFetchRequestForAllPartition("group1", true), true);
-        checkErrorResponse(createOffsetFetchRequest(0, false), new UnknownServerException(), true);
-        checkErrorResponse(createOffsetFetchRequest(1, false), new UnknownServerException(), true);
-        checkErrorResponse(createOffsetFetchRequest(2, false), new UnknownServerException(), true);
-        checkErrorResponse(createOffsetFetchRequest(7, true), new UnknownServerException(), true);
+        checkErrorResponse(createOffsetFetchRequest(0, false), unknownServerException, true);
+        checkErrorResponse(createOffsetFetchRequest(1, false), unknownServerException, true);
+        checkErrorResponse(createOffsetFetchRequest(2, false), unknownServerException, true);
+        checkErrorResponse(createOffsetFetchRequest(7, true), unknownServerException, true);
         checkResponse(createOffsetFetchResponse(), 0, true);
         checkRequest(createProduceRequest(2), true);
-        checkErrorResponse(createProduceRequest(2), new UnknownServerException(), true);
+        checkErrorResponse(createProduceRequest(2), unknownServerException, true);
         checkRequest(createProduceRequest(3), true);
-        checkErrorResponse(createProduceRequest(3), new UnknownServerException(), true);
+        checkErrorResponse(createProduceRequest(3), unknownServerException, true);
         checkResponse(createProduceResponse(), 2, true);
         checkResponse(createProduceResponseWithErrorMessage(), 8, true);
 
         for (int v = ApiKeys.STOP_REPLICA.oldestVersion(); v <= ApiKeys.STOP_REPLICA.latestVersion(); v++) {
             checkRequest(createStopReplicaRequest(v, true), true);
             checkRequest(createStopReplicaRequest(v, false), true);
-            checkErrorResponse(createStopReplicaRequest(v, true), new UnknownServerException(), true);
-            checkErrorResponse(createStopReplicaRequest(v, false), new UnknownServerException(), true);
+            checkErrorResponse(createStopReplicaRequest(v, true), unknownServerException, true);
+            checkErrorResponse(createStopReplicaRequest(v, false), unknownServerException, true);
             checkResponse(createStopReplicaResponse(), v, true);
         }
 
         checkRequest(createLeaderAndIsrRequest(0), true);
-        checkErrorResponse(createLeaderAndIsrRequest(0), new UnknownServerException(), false);
+        checkErrorResponse(createLeaderAndIsrRequest(0), unknownServerException, false);
         checkRequest(createLeaderAndIsrRequest(1), true);
-        checkErrorResponse(createLeaderAndIsrRequest(1), new UnknownServerException(), false);
+        checkErrorResponse(createLeaderAndIsrRequest(1), unknownServerException, false);
         checkRequest(createLeaderAndIsrRequest(2), true);
-        checkErrorResponse(createLeaderAndIsrRequest(2), new UnknownServerException(), false);
+        checkErrorResponse(createLeaderAndIsrRequest(2), unknownServerException, false);
         checkResponse(createLeaderAndIsrResponse(), 0, true);
         checkRequest(createSaslHandshakeRequest(), true);
-        checkErrorResponse(createSaslHandshakeRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createSaslHandshakeRequest(), unknownServerException, true);
         checkResponse(createSaslHandshakeResponse(), 0, true);
         checkRequest(createSaslAuthenticateRequest(), true);
-        checkErrorResponse(createSaslAuthenticateRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createSaslAuthenticateRequest(), unknownServerException, true);
         checkResponse(createSaslAuthenticateResponse(), 0, true);
         checkResponse(createSaslAuthenticateResponse(), 1, true);
         checkRequest(createApiVersionRequest(), true);
-        checkErrorResponse(createApiVersionRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createApiVersionRequest(), unknownServerException, true);
         checkErrorResponse(createApiVersionRequest(), new UnsupportedVersionException("Not Supported"), true);
         checkResponse(createApiVersionResponse(), 0, true);
         checkResponse(createApiVersionResponse(), 1, true);
@@ -315,107 +321,107 @@ public class RequestResponseTest {
         checkResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE, 3, true);
 
         checkRequest(createCreateTopicRequest(0), true);
-        checkErrorResponse(createCreateTopicRequest(0), new UnknownServerException(), true);
+        checkErrorResponse(createCreateTopicRequest(0), unknownServerException, true);
         checkResponse(createCreateTopicResponse(), 0, true);
         checkRequest(createCreateTopicRequest(1), true);
-        checkErrorResponse(createCreateTopicRequest(1), new UnknownServerException(), true);
+        checkErrorResponse(createCreateTopicRequest(1), unknownServerException, true);
         checkResponse(createCreateTopicResponse(), 1, true);
         checkRequest(createCreateTopicRequest(2), true);
-        checkErrorResponse(createCreateTopicRequest(2), new UnknownServerException(), true);
+        checkErrorResponse(createCreateTopicRequest(2), unknownServerException, true);
         checkResponse(createCreateTopicResponse(), 2, true);
         checkRequest(createCreateTopicRequest(3), true);
-        checkErrorResponse(createCreateTopicRequest(3), new UnknownServerException(), true);
+        checkErrorResponse(createCreateTopicRequest(3), unknownServerException, true);
         checkResponse(createCreateTopicResponse(), 3, true);
         checkRequest(createCreateTopicRequest(4), true);
-        checkErrorResponse(createCreateTopicRequest(4), new UnknownServerException(), true);
+        checkErrorResponse(createCreateTopicRequest(4), unknownServerException, true);
         checkResponse(createCreateTopicResponse(), 4, true);
         checkRequest(createCreateTopicRequest(5), true);
-        checkErrorResponse(createCreateTopicRequest(5), new UnknownServerException(), true);
+        checkErrorResponse(createCreateTopicRequest(5), unknownServerException, true);
         checkResponse(createCreateTopicResponse(), 5, true);
 
         checkRequest(createDeleteTopicsRequest(), true);
-        checkErrorResponse(createDeleteTopicsRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createDeleteTopicsRequest(), unknownServerException, true);
         checkResponse(createDeleteTopicsResponse(), 0, true);
 
         checkRequest(createInitPidRequest(), true);
-        checkErrorResponse(createInitPidRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createInitPidRequest(), unknownServerException, true);
         checkResponse(createInitPidResponse(), 0, true);
 
         checkRequest(createAddPartitionsToTxnRequest(), true);
         checkResponse(createAddPartitionsToTxnResponse(), 0, true);
-        checkErrorResponse(createAddPartitionsToTxnRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createAddPartitionsToTxnRequest(), unknownServerException, true);
         checkRequest(createAddOffsetsToTxnRequest(), true);
         checkResponse(createAddOffsetsToTxnResponse(), 0, true);
-        checkErrorResponse(createAddOffsetsToTxnRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createAddOffsetsToTxnRequest(), unknownServerException, true);
         checkRequest(createEndTxnRequest(), true);
         checkResponse(createEndTxnResponse(), 0, true);
-        checkErrorResponse(createEndTxnRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createEndTxnRequest(), unknownServerException, true);
         checkRequest(createWriteTxnMarkersRequest(), true);
         checkResponse(createWriteTxnMarkersResponse(), 0, true);
-        checkErrorResponse(createWriteTxnMarkersRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createWriteTxnMarkersRequest(), unknownServerException, true);
 
         checkOlderFetchVersions();
         checkResponse(createMetadataResponse(), 0, true);
         checkResponse(createMetadataResponse(), 1, true);
-        checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), new UnknownServerException(), true);
+        checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), unknownServerException, true);
         checkRequest(createOffsetCommitRequest(0), true);
-        checkErrorResponse(createOffsetCommitRequest(0), new UnknownServerException(), true);
+        checkErrorResponse(createOffsetCommitRequest(0), unknownServerException, true);
         checkRequest(createOffsetCommitRequest(1), true);
-        checkErrorResponse(createOffsetCommitRequest(1), new UnknownServerException(), true);
+        checkErrorResponse(createOffsetCommitRequest(1), unknownServerException, true);
         checkRequest(createOffsetCommitRequest(2), true);
-        checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException(), true);
+        checkErrorResponse(createOffsetCommitRequest(2), unknownServerException, true);
         checkRequest(createOffsetCommitRequest(3), true);
-        checkErrorResponse(createOffsetCommitRequest(3), new UnknownServerException(), true);
+        checkErrorResponse(createOffsetCommitRequest(3), unknownServerException, true);
         checkRequest(createOffsetCommitRequest(4), true);
-        checkErrorResponse(createOffsetCommitRequest(4), new UnknownServerException(), true);
+        checkErrorResponse(createOffsetCommitRequest(4), unknownServerException, true);
         checkResponse(createOffsetCommitResponse(), 4, true);
         checkRequest(createOffsetCommitRequest(5), true);
-        checkErrorResponse(createOffsetCommitRequest(5), new UnknownServerException(), true);
+        checkErrorResponse(createOffsetCommitRequest(5), unknownServerException, true);
         checkResponse(createOffsetCommitResponse(), 5, true);
         checkRequest(createJoinGroupRequest(0), true);
         checkRequest(createUpdateMetadataRequest(0, null), false);
-        checkErrorResponse(createUpdateMetadataRequest(0, null), new UnknownServerException(), true);
+        checkErrorResponse(createUpdateMetadataRequest(0, null), unknownServerException, true);
         checkRequest(createUpdateMetadataRequest(1, null), false);
         checkRequest(createUpdateMetadataRequest(1, "rack1"), false);
-        checkErrorResponse(createUpdateMetadataRequest(1, null), new UnknownServerException(), true);
+        checkErrorResponse(createUpdateMetadataRequest(1, null), unknownServerException, true);
         checkRequest(createUpdateMetadataRequest(2, "rack1"), false);
         checkRequest(createUpdateMetadataRequest(2, null), false);
-        checkErrorResponse(createUpdateMetadataRequest(2, "rack1"), new UnknownServerException(), true);
+        checkErrorResponse(createUpdateMetadataRequest(2, "rack1"), unknownServerException, true);
         checkRequest(createUpdateMetadataRequest(3, "rack1"), false);
         checkRequest(createUpdateMetadataRequest(3, null), false);
-        checkErrorResponse(createUpdateMetadataRequest(3, "rack1"), new UnknownServerException(), true);
+        checkErrorResponse(createUpdateMetadataRequest(3, "rack1"), unknownServerException, true);
         checkRequest(createUpdateMetadataRequest(4, "rack1"), false);
         checkRequest(createUpdateMetadataRequest(4, null), false);
-        checkErrorResponse(createUpdateMetadataRequest(4, "rack1"), new UnknownServerException(), true);
+        checkErrorResponse(createUpdateMetadataRequest(4, "rack1"), unknownServerException, true);
         checkRequest(createUpdateMetadataRequest(5, "rack1"), false);
         checkRequest(createUpdateMetadataRequest(5, null), false);
-        checkErrorResponse(createUpdateMetadataRequest(5, "rack1"), new UnknownServerException(), true);
+        checkErrorResponse(createUpdateMetadataRequest(5, "rack1"), unknownServerException, true);
         checkResponse(createUpdateMetadataResponse(), 0, true);
         checkRequest(createListOffsetRequest(0), true);
-        checkErrorResponse(createListOffsetRequest(0), new UnknownServerException(), true);
+        checkErrorResponse(createListOffsetRequest(0), unknownServerException, true);
         checkResponse(createListOffsetResponse(0), 0, true);
         checkRequest(createLeaderEpochRequestForReplica(0, 1), true);
         checkRequest(createLeaderEpochRequestForConsumer(), true);
         checkResponse(createLeaderEpochResponse(), 0, true);
-        checkErrorResponse(createLeaderEpochRequestForConsumer(), new UnknownServerException(), true);
+        checkErrorResponse(createLeaderEpochRequestForConsumer(), unknownServerException, true);
         checkRequest(createAddPartitionsToTxnRequest(), true);
-        checkErrorResponse(createAddPartitionsToTxnRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createAddPartitionsToTxnRequest(), unknownServerException, true);
         checkResponse(createAddPartitionsToTxnResponse(), 0, true);
         checkRequest(createAddOffsetsToTxnRequest(), true);
-        checkErrorResponse(createAddOffsetsToTxnRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createAddOffsetsToTxnRequest(), unknownServerException, true);
         checkResponse(createAddOffsetsToTxnResponse(), 0, true);
         checkRequest(createEndTxnRequest(), true);
-        checkErrorResponse(createEndTxnRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createEndTxnRequest(), unknownServerException, true);
         checkResponse(createEndTxnResponse(), 0, true);
         checkRequest(createWriteTxnMarkersRequest(), true);
-        checkErrorResponse(createWriteTxnMarkersRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createWriteTxnMarkersRequest(), unknownServerException, true);
         checkResponse(createWriteTxnMarkersResponse(), 0, true);
         checkRequest(createTxnOffsetCommitRequest(0), true);
         checkRequest(createTxnOffsetCommitRequest(3), true);
         checkRequest(createTxnOffsetCommitRequestWithAutoDowngrade(2), true);
-        checkErrorResponse(createTxnOffsetCommitRequest(0), new UnknownServerException(), true);
-        checkErrorResponse(createTxnOffsetCommitRequest(3), new UnknownServerException(), true);
-        checkErrorResponse(createTxnOffsetCommitRequestWithAutoDowngrade(2), new UnknownServerException(), true);
+        checkErrorResponse(createTxnOffsetCommitRequest(0), unknownServerException, true);
+        checkErrorResponse(createTxnOffsetCommitRequest(3), unknownServerException, true);
+        checkErrorResponse(createTxnOffsetCommitRequestWithAutoDowngrade(2), unknownServerException, true);
         checkResponse(createTxnOffsetCommitResponse(), 0, true);
         checkRequest(createDescribeAclsRequest(), true);
         checkErrorResponse(createDescribeAclsRequest(), new SecurityDisabledException("Security is not enabled."), true);
@@ -427,18 +433,18 @@ public class RequestResponseTest {
         checkErrorResponse(createDeleteAclsRequest(), new SecurityDisabledException("Security is not enabled."), true);
         checkResponse(createDeleteAclsResponse(), ApiKeys.DELETE_ACLS.latestVersion(), true);
         checkRequest(createAlterConfigsRequest(), false);
-        checkErrorResponse(createAlterConfigsRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createAlterConfigsRequest(), unknownServerException, true);
         checkResponse(createAlterConfigsResponse(), 0, false);
         checkRequest(createDescribeConfigsRequest(0), true);
         checkRequest(createDescribeConfigsRequestWithConfigEntries(0), false);
-        checkErrorResponse(createDescribeConfigsRequest(0), new UnknownServerException(), true);
+        checkErrorResponse(createDescribeConfigsRequest(0), unknownServerException, true);
         checkResponse(createDescribeConfigsResponse(), 0, false);
         checkRequest(createDescribeConfigsRequest(1), true);
         checkRequest(createDescribeConfigsRequestWithConfigEntries(1), false);
         checkRequest(createDescribeConfigsRequestWithDocumentation(1), false);
         checkRequest(createDescribeConfigsRequestWithDocumentation(2), false);
         checkRequest(createDescribeConfigsRequestWithDocumentation(3), false);
-        checkErrorResponse(createDescribeConfigsRequest(1), new UnknownServerException(), true);
+        checkErrorResponse(createDescribeConfigsRequest(1), unknownServerException, true);
         checkResponse(createDescribeConfigsResponse(), 1, false);
         checkDescribeConfigsResponseVersions();
         checkRequest(createCreatePartitionsRequest(), true);
@@ -446,33 +452,40 @@ public class RequestResponseTest {
         checkErrorResponse(createCreatePartitionsRequest(), new InvalidTopicException(), true);
         checkResponse(createCreatePartitionsResponse(), 0, true);
         checkRequest(createCreateTokenRequest(), true);
-        checkErrorResponse(createCreateTokenRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createCreateTokenRequest(), unknownServerException, true);
         checkResponse(createCreateTokenResponse(), 0, true);
         checkRequest(createDescribeTokenRequest(), true);
-        checkErrorResponse(createDescribeTokenRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createDescribeTokenRequest(), unknownServerException, true);
         checkResponse(createDescribeTokenResponse(), 0, true);
         checkRequest(createExpireTokenRequest(), true);
-        checkErrorResponse(createExpireTokenRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createExpireTokenRequest(), unknownServerException, true);
         checkResponse(createExpireTokenResponse(), 0, true);
         checkRequest(createRenewTokenRequest(), true);
-        checkErrorResponse(createRenewTokenRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createRenewTokenRequest(), unknownServerException, true);
         checkResponse(createRenewTokenResponse(), 0, true);
         checkRequest(createElectLeadersRequest(), true);
         checkRequest(createElectLeadersRequestNullPartitions(), true);
-        checkErrorResponse(createElectLeadersRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createElectLeadersRequest(), unknownServerException, true);
         checkResponse(createElectLeadersResponse(), 1, true);
         checkRequest(createIncrementalAlterConfigsRequest(), true);
-        checkErrorResponse(createIncrementalAlterConfigsRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createIncrementalAlterConfigsRequest(), unknownServerException, true);
         checkResponse(createIncrementalAlterConfigsResponse(), 0, true);
         checkRequest(createAlterPartitionReassignmentsRequest(), true);
-        checkErrorResponse(createAlterPartitionReassignmentsRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createAlterPartitionReassignmentsRequest(), unknownServerException, true);
         checkResponse(createAlterPartitionReassignmentsResponse(), 0, true);
         checkRequest(createListPartitionReassignmentsRequest(), true);
-        checkErrorResponse(createListPartitionReassignmentsRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createListPartitionReassignmentsRequest(), unknownServerException, true);
         checkResponse(createListPartitionReassignmentsResponse(), 0, true);
         checkRequest(createOffsetDeleteRequest(), true);
-        checkErrorResponse(createOffsetDeleteRequest(), new UnknownServerException(), true);
+        checkErrorResponse(createOffsetDeleteRequest(), unknownServerException, true);
         checkResponse(createOffsetDeleteResponse(), 0, true);
+
+        checkRequest(createDescribeClientQuotasRequest(), true);
+        checkErrorResponse(createDescribeClientQuotasRequest(), unknownServerException, true);
+        checkResponse(createDescribeClientQuotasResponse(), 0, true);
+        checkRequest(createAlterClientQuotasRequest(), true);
+        checkErrorResponse(createAlterClientQuotasRequest(), unknownServerException, true);
+        checkResponse(createAlterClientQuotasResponse(), 0, true);
     }
 
     @Test
@@ -486,7 +499,7 @@ public class RequestResponseTest {
     private void checkOlderFetchVersions() throws Exception {
         int latestVersion = FETCH.latestVersion();
         for (int i = 0; i < latestVersion; ++i) {
-            checkErrorResponse(createFetchRequest(i), new UnknownServerException(), true);
+            checkErrorResponse(createFetchRequest(i), unknownServerException, true);
             checkRequest(createFetchRequest(i), true);
             checkResponse(createFetchResponse(), i, true);
         }
@@ -532,7 +545,13 @@ public class RequestResponseTest {
     }
 
     private void checkErrorResponse(AbstractRequest req, Throwable e, boolean checkEqualityAndHashCode) {
-        checkResponse(req.getErrorResponse(e), req.version(), checkEqualityAndHashCode);
+        AbstractResponse response = req.getErrorResponse(e);
+        checkResponse(response, req.version(), checkEqualityAndHashCode);
+        if (e instanceof UnknownServerException) {
+            String responseStr = response.toStruct(req.version()).toString();
+            assertFalse(String.format("Unknown message included in response for %s: %s ", req.api, responseStr),
+                    responseStr.contains(e.getMessage()));
+        }
     }
 
     private void checkRequest(AbstractRequest req, boolean checkEqualityAndHashCode) {
@@ -2234,4 +2253,25 @@ public class RequestResponseTest {
         return new OffsetDeleteResponse(data);
     }
 
+    private DescribeClientQuotasRequest createDescribeClientQuotasRequest() {
+        ClientQuotaFilter filter = ClientQuotaFilter.all();
+        return new DescribeClientQuotasRequest.Builder(filter).build((short) 0);
+    }
+
+    private DescribeClientQuotasResponse createDescribeClientQuotasResponse() {
+        ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"));
+        return new DescribeClientQuotasResponse(Collections.singletonMap(entity, Collections.singletonMap("request_percentage", 1.0)), 0);
+    }
+
+    private AlterClientQuotasRequest createAlterClientQuotasRequest() {
+        ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"));
+        ClientQuotaAlteration.Op op = new ClientQuotaAlteration.Op("request_percentage", 2.0);
+        ClientQuotaAlteration alteration = new ClientQuotaAlteration(entity, Collections.singleton(op));
+        return new AlterClientQuotasRequest.Builder(Collections.singleton(alteration), false).build((short) 0);
+    }
+
+    private AlterClientQuotasResponse createAlterClientQuotasResponse() {
+        ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"));
+        return new AlterClientQuotasResponse(Collections.singletonMap(entity, ApiError.NONE), 0);
+    }
 }
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index fdc83fd..3f968ef 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -23,6 +23,7 @@ import java.util.{Collections, Properties}
 import joptsimple._
 import kafka.common.Config
 import kafka.log.LogConfig
+import kafka.server.DynamicConfig.QuotaConfigs
 import kafka.server.{ConfigEntityName, ConfigType, Defaults, DynamicBrokerConfig, DynamicConfig, KafkaConfig}
 import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncoder}
 import kafka.utils.Implicits._
@@ -364,6 +365,14 @@ object ConfigCommand extends Config {
         adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
 
       case ConfigType.User | ConfigType.Client =>
+        val nonQuotaConfigsToAdd = configsToBeAdded.keys.filterNot(QuotaConfigs.isQuotaConfig)
+        if (nonQuotaConfigsToAdd.nonEmpty)
+          throw new IllegalArgumentException(s"Only quota configs can be added for '$entityTypeHead' using --bootstrap-server. Unexpected config names: $nonQuotaConfigsToAdd")
+        val nonQuotaConfigsToDelete = configsToBeDeleted.filterNot(QuotaConfigs.isQuotaConfig)
+        if (nonQuotaConfigsToDelete.nonEmpty)
+          throw new IllegalArgumentException(s"Only quota configs can be deleted for '$entityTypeHead' using --bootstrap-server. Unexpected config names: $nonQuotaConfigsToDelete")
+
+
         val oldConfig = getClientQuotasConfig(adminClient, entityTypes, entityNames)
 
         val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 183a5d3..5fc411d 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -23,6 +23,7 @@ import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.log.LogConfig
 import kafka.utils.Log4jController
 import kafka.metrics.KafkaMetricsGroup
+import kafka.server.DynamicConfig.QuotaConfigs
 import kafka.utils._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.admin.AlterConfigOp
@@ -864,19 +865,20 @@ class AdminManager(val config: KafkaConfig,
         !name.isDefined || !strict
     }
 
-    def fromProps(props: Properties): Map[String, Double] = {
-      props.asScala.map { case (key, value) =>
+    def fromProps(props: Map[String, String]): Map[String, Double] = {
+      props.map { case (key, value) =>
         val doubleValue = try value.toDouble catch {
           case _: NumberFormatException =>
-            throw new IllegalStateException(s"Unexpected client quota configuration value: ${key} -> ${value}")
+            throw new IllegalStateException(s"Unexpected client quota configuration value: $key -> $value")
         }
         (key -> doubleValue)
       }
     }
 
     (userEntries ++ clientIdEntries ++ bothEntries).map { case ((u, c), p) =>
-      if (!p.isEmpty && matches(userComponent, u) && matches(clientIdComponent, c))
-        Some((userClientIdToEntity(u, c) -> fromProps(p)))
+      val quotaProps = p.asScala.filter { case (key, _) => QuotaConfigs.isQuotaConfig(key) }
+      if (quotaProps.nonEmpty && matches(userComponent, u) && matches(clientIdComponent, c))
+        Some(userClientIdToEntity(u, c) -> fromProps(quotaProps))
       else
         None
     }.flatten.toMap
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index 13c64bf..f3d40a1 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -67,11 +67,20 @@ object DynamicConfig {
     def validate(props: Properties) = DynamicConfig.validate(brokerConfigDef, props, customPropsAllowed = true)
   }
 
-  object Client {
-    //Properties
+  object QuotaConfigs {
     val ProducerByteRateOverrideProp = "producer_byte_rate"
     val ConsumerByteRateOverrideProp = "consumer_byte_rate"
     val RequestPercentageOverrideProp = "request_percentage"
+    private val configNames = Set(ProducerByteRateOverrideProp, ConsumerByteRateOverrideProp, RequestPercentageOverrideProp)
+
+    def isQuotaConfig(name: String): Boolean = configNames.contains(name)
+  }
+
+  object Client {
+    //Properties
+    val ProducerByteRateOverrideProp = QuotaConfigs.ProducerByteRateOverrideProp
+    val ConsumerByteRateOverrideProp = QuotaConfigs.ConsumerByteRateOverrideProp
+    val RequestPercentageOverrideProp = QuotaConfigs.RequestPercentageOverrideProp
 
     //Defaults
     val DefaultProducerOverride = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 2b357b1..d412123 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -486,6 +486,27 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   }
 
   @Test
+  def shouldNotAlterNonQuotaClientConfigUsingBootstrapServer(): Unit = {
+    val node = new Node(1, "localhost", 9092)
+    val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node)
+
+    def verifyCommand(entityType: String, alterOpts: String*): Unit = {
+      val opts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
+        "--entity-type", entityType, "--entity-name", "admin",
+        "--alter") ++ alterOpts)
+      val e = intercept[IllegalArgumentException] {
+        ConfigCommand.alterConfig(mockAdminClient, opts)
+      }
+      assertTrue(s"Unexpected exception: $e", e.getMessage.contains("some_config"))
+    }
+
+    verifyCommand("users", "--add-config", "consumer_byte_rate=20000,producer_byte_rate=10000,some_config=10")
+    verifyCommand("clients", "--add-config", "some_config=10")
+    verifyCommand("users", "--delete-config", "consumer_byte_rate=20000,some_config=10")
+    verifyCommand("clients", "--delete-config", "some_config=10")
+  }
+
+  @Test
   def shouldAddTopicConfigUsingZookeeper(): Unit = {
     val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
       "--entity-name", "my-topic",
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
index b5c694a..6ddfd3b 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -23,9 +23,10 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
 import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse}
 import org.junit.Assert._
 import org.junit.Test
-
 import java.util.concurrent.{ExecutionException, TimeUnit}
 
+import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
+
 import scala.jdk.CollectionConverters._
 
 class ClientQuotasRequestTest extends BaseRequestTest {
@@ -37,6 +38,7 @@ class ClientQuotasRequestTest extends BaseRequestTest {
 
   @Test
   def testAlterClientQuotasRequest(): Unit = {
+
     val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user"), (ClientQuotaEntity.CLIENT_ID -> "client-id")).asJava)
 
     // Expect an empty configuration.
@@ -162,6 +164,32 @@ class ClientQuotasRequestTest extends BaseRequestTest {
     ))
   }
 
+  @Test
+  def testClientQuotasForScramUsers(): Unit = {
+    val entityType = ConfigType.User
+    val userName = "user"
+
+    val mechanism = ScramMechanism.SCRAM_SHA_256
+    val credential = new ScramFormatter(mechanism).generateCredential("password", 4096)
+    val configs = adminZkClient.fetchEntityConfig(entityType, userName)
+    configs.setProperty(mechanism.mechanismName, ScramCredentialUtils.credentialToString(credential))
+    adminZkClient.changeConfigs(entityType, userName, configs)
+
+    val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> userName).asJava)
+
+    verifyDescribeEntityQuotas(entity, Map.empty)
+
+    alterEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> Some(10000.0)),
+      (ConsumerByteRateProp -> Some(20000.0))
+    ), validateOnly = false)
+
+    verifyDescribeEntityQuotas(entity, Map(
+      (ProducerByteRateProp -> 10000.0),
+      (ConsumerByteRateProp -> 20000.0)
+    ))
+  }
+
   @Test(expected = classOf[InvalidRequestException])
   def testAlterClientQuotasBadUser(): Unit = {
     val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "")).asJava)