You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2020/03/15 06:03:52 UTC
[kafka] branch trunk updated: KIP-546: Implement
describeClientQuotas and alterClientQuotas. (#8083)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 227a732 KIP-546: Implement describeClientQuotas and alterClientQuotas. (#8083)
227a732 is described below
commit 227a7322b77840e08924b9486e4bda2f3dfc1f1a
Author: Brian Byrne <bb...@confluent.io>
AuthorDate: Sat Mar 14 23:03:13 2020 -0700
KIP-546: Implement describeClientQuotas and alterClientQuotas. (#8083)
Reviewers: Colin P. McCabe <cm...@apache.org>
---
build.gradle | 2 +-
checkstyle/import-control.xml | 7 +-
checkstyle/suppressions.xml | 2 +-
.../java/org/apache/kafka/clients/admin/Admin.java | 82 +++-
.../clients/admin/AlterClientQuotasOptions.java | 46 +++
.../clients/admin/AlterClientQuotasResult.java | 58 +++
.../clients/admin/DescribeClientQuotasOptions.java | 29 ++
.../clients/admin/DescribeClientQuotasResult.java | 52 +++
.../kafka/clients/admin/KafkaAdminClient.java | 68 +++-
.../org/apache/kafka/common/protocol/ApiKeys.java | 8 +-
.../kafka/common/quota/ClientQuotaAlteration.java | 106 +++++
.../kafka/common/quota/ClientQuotaEntity.java | 70 ++++
.../kafka/common/quota/ClientQuotaFilter.java | 101 +++++
.../common/quota/ClientQuotaFilterComponent.java | 109 +++++
.../kafka/common/requests/AbstractRequest.java | 4 +
.../kafka/common/requests/AbstractResponse.java | 4 +
.../common/requests/AlterClientQuotasRequest.java | 133 +++++++
.../common/requests/AlterClientQuotasResponse.java | 124 ++++++
.../requests/DescribeClientQuotasRequest.java | 121 ++++++
.../requests/DescribeClientQuotasResponse.java | 119 ++++++
.../common/message/AlterClientQuotasRequest.json | 45 +++
.../common/message/AlterClientQuotasResponse.json | 40 ++
.../message/DescribeClientQuotasRequest.json | 35 ++
.../message/DescribeClientQuotasResponse.json | 47 +++
.../kafka/clients/admin/KafkaAdminClientTest.java | 95 +++++
.../kafka/clients/admin/MockAdminClient.java | 12 +
.../src/main/scala/kafka/admin/ConfigCommand.scala | 131 ++++--
.../src/main/scala/kafka/server/AdminManager.scala | 199 ++++++++-
.../main/scala/kafka/server/DynamicConfig.scala | 4 +
core/src/main/scala/kafka/server/KafkaApis.scala | 30 ++
.../scala/unit/kafka/admin/ConfigCommandTest.scala | 144 ++++++-
.../kafka/server/ClientQuotasRequestTest.scala | 443 +++++++++++++++++++++
32 files changed, 2416 insertions(+), 54 deletions(-)
diff --git a/build.gradle b/build.gradle
index 975d56b..05fc20f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -457,7 +457,7 @@ subprojects {
// See https://www.lightbend.com/blog/scala-inliner-optimizer for more information about the optimizer.
scalaCompileOptions.additionalParameters += ["-opt:l:inline"]
scalaCompileOptions.additionalParameters += inlineFrom
-
+
// these options are valid for Scala versions < 2.13 only
// Scala 2.13 removes them, see https://github.com/scala/scala/pull/6502 and https://github.com/scala/scala/pull/5969
if (versions.baseScala == '2.12') {
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index e3975a3..c4a7662 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -145,6 +145,7 @@
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.network" />
+ <allow pkg="org.apache.kafka.common.quota" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.common.record" />
@@ -162,6 +163,10 @@
<subpackage name="utils">
<allow pkg="org.apache.kafka.common" />
</subpackage>
+
+ <subpackage name="quotas">
+ <allow pkg="org.apache.kafka.common" />
+ </subpackage>
</subpackage>
<subpackage name="clients">
@@ -241,7 +246,7 @@
<subpackage name="perf">
<allow pkg="com.fasterxml.jackson.databind" />
</subpackage>
-
+
<subpackage name="integration">
<allow pkg="kafka.admin" />
<allow pkg="kafka.api" />
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f533179..aa3a086 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -62,7 +62,7 @@
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager).java"/>
<suppress checks="JavaNCSS"
- files="(AbstractRequest|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest).java"/>
+ files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest).java"/>
<suppress checks="NPathComplexity"
files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index fd70e97..b99fd5f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -36,6 +36,8 @@ import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.requests.LeaveGroupResponse;
/**
@@ -408,7 +410,6 @@ public interface Admin extends AutoCloseable {
return incrementalAlterConfigs(configs, new AlterConfigsOptions());
}
-
/**
* Incrementally update the configuration for the specified resources.
* <p>
@@ -1133,6 +1134,85 @@ public interface Admin extends AutoCloseable {
ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options);
/**
+ * Describes all entities matching the provided filter that have at least one client quota configuration
+ * value defined.
+ * <p>
+ * This is a convenience method for {@link #describeClientQuotas(ClientQuotaFilter, DescribeClientQuotasOptions)}
+ * with default options. See the overload for more details.
+ * <p>
+ * This operation is supported by brokers with version 2.6.0 or higher.
+ *
+ * @param filter the filter to apply to match entities
+ * @return the DescribeClientQuotasResult containing the result
+ */
+ default DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter) {
+ return describeClientQuotas(filter, new DescribeClientQuotasOptions());
+ }
+
+ /**
+ * Describes all entities matching the provided filter that have at least one client quota configuration
+ * value defined.
+ * <p>
+ * The following exceptions can be anticipated when calling {@code get()} on the future from the
+ * returned {@link DescribeClientQuotasResult}:
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+ * If the authenticated user didn't have describe access to the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+ * If the request details are invalid. e.g., an invalid entity type was specified.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * If the request timed out before the describe could finish.</li>
+ * </ul>
+ * <p>
+ * This operation is supported by brokers with version 2.6.0 or higher.
+ *
+ * @param filter the filter to apply to match entities
+ * @param options the options to use
+ * @return the DescribeClientQuotasResult containing the result
+ */
+ DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options);
+
+ /**
+ * Alters client quota configurations with the specified alterations.
+ * <p>
+ * This is a convenience method for {@link #alterClientQuotas(Collection, AlterClientQuotasOptions)}
+ * with default options. See the overload for more details.
+ * <p>
+ * This operation is supported by brokers with version 2.6.0 or higher.
+ *
+ * @param entries the alterations to perform
+ * @return the AlterClientQuotasResult containing the result
+ */
+ default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries) {
+ return alterClientQuotas(entries, new AlterClientQuotasOptions());
+ }
+
+ /**
+ * Alters client quota configurations with the specified alterations.
+ * <p>
+ * Alterations for a single entity are atomic, but across entities is not guaranteed. The resulting
+ * per-entity error code should be evaluated to resolve the success or failure of all updates.
+ * <p>
+ * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
+ * the returned {@link AlterClientQuotasResult}:
+ * <ul>
+ * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
+ * If the authenticated user didn't have alter access to the cluster.</li>
+ * <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
+ * If the request details are invalid. e.g., a configuration key was specified more than once for an entity.</li>
+ * <li>{@link org.apache.kafka.common.errors.TimeoutException}
+ * If the request timed out before the alterations could finish. It cannot be guaranteed whether the update
+ * succeed or not.</li>
+ * </ul>
+ * <p>
+ * This operation is supported by brokers with version 2.6.0 or higher.
+ *
+ * @param entries the alterations to perform
+ * @return the AlterClientQuotasResult containing the result
+ */
+ AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
+
+ /**
* Get the metrics kept by the adminClient
*/
Map<MetricName, ? extends Metric> metrics();
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterClientQuotasOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterClientQuotasOptions.java
new file mode 100644
index 0000000..3cdaa97
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterClientQuotasOptions.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link Admin#alterClientQuotas(Collection, AlterClientQuotasOptions)}.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class AlterClientQuotasOptions extends AbstractOptions<AlterClientQuotasOptions> {
+
+ private boolean validateOnly = false;
+
+ /**
+ * Returns whether the request should be validated without altering the configs.
+ */
+ public boolean validateOnly() {
+ return this.validateOnly;
+ }
+
+ /**
+ * Sets whether the request should be validated without altering the configs.
+ */
+ public AlterClientQuotasOptions validateOnly(boolean validateOnly) {
+ this.validateOnly = validateOnly;
+ return this;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterClientQuotasResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterClientQuotasResult.java
new file mode 100644
index 0000000..63c6b3e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterClientQuotasResult.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+
+import java.util.Map;
+
+/**
+ * The result of the {@link Admin#alterClientQuotas(Collection, AlterClientQuotasOptions)} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class AlterClientQuotasResult {
+
+ private final Map<ClientQuotaEntity, KafkaFuture<Void>> futures;
+
+ /**
+ * Maps an entity to its alteration result.
+ *
+ * @param futures maps entity to its alteration result
+ */
+ public AlterClientQuotasResult(Map<ClientQuotaEntity, KafkaFuture<Void>> futures) {
+ this.futures = futures;
+ }
+
+ /**
+ * Returns a map from quota entity to a future which can be used to check the status of the operation.
+ */
+ public Map<ClientQuotaEntity, KafkaFuture<Void>> values() {
+ return futures;
+ }
+
+ /**
+ * Returns a future which succeeds only if all quota alterations succeed.
+ */
+ public KafkaFuture<Void> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClientQuotasOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClientQuotasOptions.java
new file mode 100644
index 0000000..14e7e45
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClientQuotasOptions.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link Admin#describeClientQuotas(ClientQuotaFilter, DescribeClientQuotasOptions)}.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeClientQuotasOptions extends AbstractOptions<DescribeClientQuotasOptions> {
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClientQuotasResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClientQuotasResult.java
new file mode 100644
index 0000000..b485590
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClientQuotasResult.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+
+import java.util.Map;
+
+/**
+ * The result of the {@link Admin#describeClientQuotas(ClientQuotaFilter, DescribeClientQuotasOptions)} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeClientQuotasResult {
+
+ private final KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> entities;
+
+ /**
+ * Maps an entity to its configured quota value(s). Note if no value is defined for a quota
+ * type for that entity's config, then it is not included in the resulting value map.
+ *
+ * @param entities future for the collection of entities that matched the filter
+ */
+ public DescribeClientQuotasResult(KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> entities) {
+ this.entities = entities;
+ }
+
+ /**
+ * Returns a map from quota entity to a future which can be used to check the status of the operation.
+ */
+ public KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> entities() {
+ return entities;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index dbe3d1a..e654e73 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -129,8 +129,13 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AlterClientQuotasRequest;
+import org.apache.kafka.common.requests.AlterClientQuotasResponse;
import org.apache.kafka.common.requests.AlterConfigsRequest;
import org.apache.kafka.common.requests.AlterConfigsResponse;
import org.apache.kafka.common.requests.AlterPartitionReassignmentsRequest;
@@ -156,6 +161,8 @@ import org.apache.kafka.common.requests.DeleteTopicsRequest;
import org.apache.kafka.common.requests.DeleteTopicsResponse;
import org.apache.kafka.common.requests.DescribeAclsRequest;
import org.apache.kafka.common.requests.DescribeAclsResponse;
+import org.apache.kafka.common.requests.DescribeClientQuotasRequest;
+import org.apache.kafka.common.requests.DescribeClientQuotasResponse;
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
@@ -3748,7 +3755,7 @@ public class KafkaAdminClient extends AdminClient {
MetadataOperationContext<ListOffsetsResultInfo, ListOffsetsOptions> context =
new MetadataOperationContext<>(topics, options, deadline, futures);
- Call metadataCall = getMetadataCall(context,
+ Call metadataCall = getMetadataCall(context,
() -> KafkaAdminClient.this.getListOffsetsCalls(context, topicPartitionOffsets, futures));
runnable.call(metadataCall, nowMetadata);
@@ -3845,6 +3852,65 @@ public class KafkaAdminClient extends AdminClient {
return calls;
}
+ @Override
+ public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options) {
+ KafkaFutureImpl<Map<ClientQuotaEntity, Map<String, Double>>> future = new KafkaFutureImpl<>();
+
+ final long now = time.milliseconds();
+ runnable.call(new Call("describeClientQuotas", calcDeadlineMs(now, options.timeoutMs()),
+ new LeastLoadedNodeProvider()) {
+
+ @Override
+ DescribeClientQuotasRequest.Builder createRequest(int timeoutMs) {
+ return new DescribeClientQuotasRequest.Builder(filter);
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ DescribeClientQuotasResponse response = (DescribeClientQuotasResponse) abstractResponse;
+ response.complete(future);
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+ }, now);
+
+ return new DescribeClientQuotasResult(future);
+ }
+
+ @Override
+ public AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options) {
+ Map<ClientQuotaEntity, KafkaFutureImpl<Void>> futures = new HashMap<>(entries.size());
+ for (ClientQuotaAlteration entry : entries) {
+ futures.put(entry.entity(), new KafkaFutureImpl<>());
+ }
+
+ final long now = time.milliseconds();
+ runnable.call(new Call("alterClientQuotas", calcDeadlineMs(now, options.timeoutMs()),
+ new LeastLoadedNodeProvider()) {
+
+ @Override
+ AlterClientQuotasRequest.Builder createRequest(int timeoutMs) {
+ return new AlterClientQuotasRequest.Builder(entries, options.validateOnly());
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ AlterClientQuotasResponse response = (AlterClientQuotasResponse) abstractResponse;
+ response.complete(futures);
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ completeAllExceptionally(futures.values(), throwable);
+ }
+ }, now);
+
+ return new AlterClientQuotasResult(Collections.unmodifiableMap(futures));
+ }
+
/**
* Get a sub level error when the request is in batch. If given key was not found,
* return an {@link IllegalArgumentException}.
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 4fc8287..6549d8f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -19,6 +19,8 @@ package org.apache.kafka.common.protocol;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
+import org.apache.kafka.common.message.AlterClientQuotasRequestData;
+import org.apache.kafka.common.message.AlterClientQuotasResponseData;
import org.apache.kafka.common.message.ControlledShutdownRequestData;
import org.apache.kafka.common.message.ControlledShutdownResponseData;
import org.apache.kafka.common.message.CreateAclsRequestData;
@@ -39,6 +41,8 @@ import org.apache.kafka.common.message.DeleteTopicsRequestData;
import org.apache.kafka.common.message.DeleteTopicsResponseData;
import org.apache.kafka.common.message.DescribeAclsRequestData;
import org.apache.kafka.common.message.DescribeAclsResponseData;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
import org.apache.kafka.common.message.DescribeDelegationTokenRequestData;
import org.apache.kafka.common.message.DescribeDelegationTokenResponseData;
import org.apache.kafka.common.message.DescribeGroupsRequestData;
@@ -205,7 +209,9 @@ public enum ApiKeys {
AlterPartitionReassignmentsResponseData.SCHEMAS),
LIST_PARTITION_REASSIGNMENTS(46, "ListPartitionReassignments", ListPartitionReassignmentsRequestData.SCHEMAS,
ListPartitionReassignmentsResponseData.SCHEMAS),
- OFFSET_DELETE(47, "OffsetDelete", OffsetDeleteRequestData.SCHEMAS, OffsetDeleteResponseData.SCHEMAS);
+ OFFSET_DELETE(47, "OffsetDelete", OffsetDeleteRequestData.SCHEMAS, OffsetDeleteResponseData.SCHEMAS),
+ DESCRIBE_CLIENT_QUOTAS(48, "DescribeClientQuotas", DescribeClientQuotasRequestData.SCHEMAS, DescribeClientQuotasResponseData.SCHEMAS),
+ ALTER_CLIENT_QUOTAS(49, "AlterClientQuotas", AlterClientQuotasRequestData.SCHEMAS, AlterClientQuotasResponseData.SCHEMAS);
private static final ApiKeys[] ID_TO_TYPE;
private static final int MIN_API_KEY = 0;
diff --git a/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaAlteration.java b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaAlteration.java
new file mode 100644
index 0000000..88670ce
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaAlteration.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.quota;
+
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Describes a configuration alteration to be made to a client quota entity.
+ */
+public class ClientQuotaAlteration {
+
+ public static class Op {
+ private final String key;
+ private final Double value;
+
+ /**
+ * @param key the quota type to alter
+ * @param value if set then the existing value is updated,
+ * otherwise if null, the existing value is cleared
+ */
+ public Op(String key, Double value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ /**
+ * @return the quota type to alter
+ */
+ public String key() {
+ return this.key;
+ }
+
+ /**
+ * @return if set then the existing value is updated,
+ * otherwise if null, the existing value is cleared
+ */
+ public Double value() {
+ return this.value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Op that = (Op) o;
+ return Objects.equals(key, that.key) && Objects.equals(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, value);
+ }
+
+ @Override
+ public String toString() {
+ return "ClientQuotaAlteration.Op(key=" + key + ", value=" + value + ")";
+ }
+ }
+
+ private final ClientQuotaEntity entity;
+ private final Collection<Op> ops;
+
+ /**
+ * @param entity the entity whose config will be modified
+ * @param ops the alteration to perform
+ */
+ public ClientQuotaAlteration(ClientQuotaEntity entity, Collection<Op> ops) {
+ this.entity = entity;
+ this.ops = ops;
+ }
+
+ /**
+ * @return the entity whose config will be modified
+ */
+ public ClientQuotaEntity entity() {
+ return this.entity;
+ }
+
+ /**
+ * @return the alteration to perform
+ */
+ public Collection<Op> ops() {
+ return this.ops;
+ }
+
+ @Override
+ public String toString() {
+ return "ClientQuotaAlteration(entity=" + entity + ", ops=" + ops + ")";
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaEntity.java b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaEntity.java
new file mode 100644
index 0000000..0fee3d3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaEntity.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.quota;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Describes a client quota entity, which is a mapping of entity types to their names.
+ */
+public class ClientQuotaEntity {
+
+ private final Map<String, String> entries;
+
+ /**
+ * The type of an entity entry.
+ */
+ public static final String USER = "user";
+ public static final String CLIENT_ID = "client-id";
+
+ /**
+ * Constructs a quota entity for the given types and names. If a name is null,
+ * then it is mapped to the built-in default entity name.
+ *
+ * @param entries maps entity type to its name
+ */
+ public ClientQuotaEntity(Map<String, String> entries) {
+ this.entries = entries;
+ }
+
+ /**
+ * @return map of entity type to its name
+ */
+ public Map<String, String> entries() {
+ return this.entries;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ClientQuotaEntity that = (ClientQuotaEntity) o;
+ return Objects.equals(entries, that.entries);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(entries);
+ }
+
+ @Override
+ public String toString() {
+ return "ClientQuotaEntity(entries=" + entries + ")";
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilter.java b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilter.java
new file mode 100644
index 0000000..e8a6a72
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.quota;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+/**
+ * Describes a client quota entity filter.
+ */
+public class ClientQuotaFilter {
+
+ private final Collection<ClientQuotaFilterComponent> components;
+ private final boolean strict;
+
+ /**
+ * A filter to be applied to matching client quotas.
+ *
+ * @param components the components to filter on
+ * @param strict whether the filter only includes specified components
+ */
+ private ClientQuotaFilter(Collection<ClientQuotaFilterComponent> components, boolean strict) {
+ this.components = components;
+ this.strict = strict;
+ }
+
+ /**
+ * Constructs and returns a quota filter that matches all provided components. Matching entities
+ * with entity types that are not specified by a component will also be included in the result.
+ *
+ * @param components the components for the filter
+ */
+ public static ClientQuotaFilter contains(Collection<ClientQuotaFilterComponent> components) {
+ return new ClientQuotaFilter(components, false);
+ }
+
+ /**
+ * Constructs and returns a quota filter that matches all provided components. Matching entities
+ * with entity types that are not specified by a component will *not* be included in the result.
+ *
+ * @param components the components for the filter
+ */
+ public static ClientQuotaFilter containsOnly(Collection<ClientQuotaFilterComponent> components) {
+ return new ClientQuotaFilter(components, true);
+ }
+
+ /**
+ * Constructs and returns a quota filter that matches all configured entities.
+ */
+ public static ClientQuotaFilter all() {
+ return new ClientQuotaFilter(Collections.emptyList(), false);
+ }
+
+ /**
+ * @return the filter's components
+ */
+ public Collection<ClientQuotaFilterComponent> components() {
+ return this.components;
+ }
+
+ /**
+ * @return whether the filter is strict, i.e. only includes specified components
+ */
+ public boolean strict() {
+ return this.strict;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ClientQuotaFilter that = (ClientQuotaFilter) o;
+ return Objects.equals(components, that.components) && Objects.equals(strict, that.strict);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(components, strict);
+ }
+
+ @Override
+ public String toString() {
+ return "ClientQuotaFilter(components=" + components + ", strict=" + strict + ")";
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilterComponent.java b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilterComponent.java
new file mode 100644
index 0000000..b981ead
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilterComponent.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.quota;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Describes a component for applying a client quota filter.
+ */
+public class ClientQuotaFilterComponent {
+
+ private final String entityType;
+ private final Optional<String> match;
+
+ /**
+ * A filter to be applied.
+ *
+ * @param entityType the entity type the filter component applies to
+ * @param match if present, the name that's matched exactly
+ * if empty, matches the default name
+ * if null, matches any specified name
+ */
+ private ClientQuotaFilterComponent(String entityType, Optional<String> match) {
+ this.entityType = Objects.requireNonNull(entityType);
+ this.match = match;
+ }
+
+ /**
+ * Constructs and returns a filter component that exactly matches the provided entity
+ * name for the entity type.
+ *
+ * @param entityType the entity type the filter component applies to
+ * @param entityName the entity name that's matched exactly
+ */
+ public static ClientQuotaFilterComponent ofEntity(String entityType, String entityName) {
+ return new ClientQuotaFilterComponent(entityType, Optional.of(Objects.requireNonNull(entityName)));
+ }
+
+ /**
+ * Constructs and returns a filter component that matches the built-in default entity name
+ * for the entity type.
+ *
+ * @param entityType the entity type the filter component applies to
+ */
+ public static ClientQuotaFilterComponent ofDefaultEntity(String entityType) {
+ return new ClientQuotaFilterComponent(entityType, Optional.empty());
+ }
+
+ /**
+ * Constructs and returns a filter component that matches any specified name for the
+ * entity type.
+ *
+ * @param entityType the entity type the filter component applies to
+ */
+ public static ClientQuotaFilterComponent ofEntityType(String entityType) {
+ return new ClientQuotaFilterComponent(entityType, null);
+ }
+
+ /**
+ * @return the component's entity type
+ */
+ public String entityType() {
+ return this.entityType;
+ }
+
+ /**
+ * @return the optional match string, where:
+ * if present, the name that's matched exactly
+ * if empty, matches the default name
+ * if null, matches any specified name
+ */
+ public Optional<String> match() {
+ return this.match;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ClientQuotaFilterComponent that = (ClientQuotaFilterComponent) o;
+ return Objects.equals(entityType, match);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(entityType, match);
+ }
+
+ @Override
+ public String toString() {
+ return "ClientQuotaFilterComponent(entityType=" + entityType + ", match=" + match + ")";
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 97bc728..f4085b8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -239,6 +239,10 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return new ListPartitionReassignmentsRequest(struct, apiVersion);
case OFFSET_DELETE:
return new OffsetDeleteRequest(struct, apiVersion);
+ case DESCRIBE_CLIENT_QUOTAS:
+ return new DescribeClientQuotasRequest(struct, apiVersion);
+ case ALTER_CLIENT_QUOTAS:
+ return new AlterClientQuotasRequest(struct, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 37f60dc..9d3f39d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -176,6 +176,10 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
return new ListPartitionReassignmentsResponse(struct, version);
case OFFSET_DELETE:
return new OffsetDeleteResponse(struct, version);
+ case DESCRIBE_CLIENT_QUOTAS:
+ return new DescribeClientQuotasResponse(struct, version);
+ case ALTER_CLIENT_QUOTAS:
+ return new AlterClientQuotasResponse(struct, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
new file mode 100644
index 0000000..6d4c2a1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.AlterClientQuotasRequestData;
+import org.apache.kafka.common.message.AlterClientQuotasRequestData.EntityData;
+import org.apache.kafka.common.message.AlterClientQuotasRequestData.EntryData;
+import org.apache.kafka.common.message.AlterClientQuotasRequestData.OpData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AlterClientQuotasRequest extends AbstractRequest {
+
+ public static class Builder extends AbstractRequest.Builder<AlterClientQuotasRequest> {
+
+ private final AlterClientQuotasRequestData data;
+
+ public Builder(Collection<ClientQuotaAlteration> entries, boolean validateOnly) {
+ super(ApiKeys.ALTER_CLIENT_QUOTAS);
+
+ List<EntryData> entryData = new ArrayList<>(entries.size());
+ for (ClientQuotaAlteration entry : entries) {
+ List<EntityData> entityData = new ArrayList<>(entry.entity().entries().size());
+ for (Map.Entry<String, String> entityEntries : entry.entity().entries().entrySet()) {
+ entityData.add(new EntityData()
+ .setEntityType(entityEntries.getKey())
+ .setEntityName(entityEntries.getValue()));
+ }
+
+ List<OpData> opData = new ArrayList<>(entry.ops().size());
+ for (ClientQuotaAlteration.Op op : entry.ops()) {
+ opData.add(new OpData()
+ .setKey(op.key())
+ .setValue(op.value() == null ? 0.0 : op.value())
+ .setRemove(op.value() == null));
+ }
+
+ entryData.add(new EntryData()
+ .setEntity(entityData)
+ .setOps(opData));
+ }
+
+ this.data = new AlterClientQuotasRequestData()
+ .setEntries(entryData)
+ .setValidateOnly(validateOnly);
+ }
+
+ @Override
+ public AlterClientQuotasRequest build(short version) {
+ return new AlterClientQuotasRequest(data, version);
+ }
+
+ @Override
+ public String toString() {
+ return data.toString();
+ }
+ }
+
+ private final AlterClientQuotasRequestData data;
+
+ public AlterClientQuotasRequest(AlterClientQuotasRequestData data, short version) {
+ super(ApiKeys.ALTER_CLIENT_QUOTAS, version);
+ this.data = data;
+ }
+
+ public AlterClientQuotasRequest(Struct struct, short version) {
+ super(ApiKeys.ALTER_CLIENT_QUOTAS, version);
+ this.data = new AlterClientQuotasRequestData(struct, version);
+ }
+
+ public Collection<ClientQuotaAlteration> entries() {
+ List<ClientQuotaAlteration> entries = new ArrayList<>(data.entries().size());
+ for (EntryData entryData : data.entries()) {
+ Map<String, String> entity = new HashMap<>(entryData.entity().size());
+ for (EntityData entityData : entryData.entity()) {
+ entity.put(entityData.entityType(), entityData.entityName());
+ }
+
+ List<ClientQuotaAlteration.Op> ops = new ArrayList<>(entryData.ops().size());
+ for (OpData opData : entryData.ops()) {
+ Double value = opData.remove() ? null : opData.value();
+ ops.add(new ClientQuotaAlteration.Op(opData.key(), value));
+ }
+
+ entries.add(new ClientQuotaAlteration(new ClientQuotaEntity(entity), ops));
+ }
+ return entries;
+ }
+
+ public boolean validateOnly() {
+ return data.validateOnly();
+ }
+
+ @Override
+ public AlterClientQuotasResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ ArrayList<ClientQuotaEntity> entities = new ArrayList<>(data.entries().size());
+ for (EntryData entryData : data.entries()) {
+ Map<String, String> entity = new HashMap<>(entryData.entity().size());
+ for (EntityData entityData : entryData.entity()) {
+ entity.put(entityData.entityType(), entityData.entityName());
+ }
+ entities.add(new ClientQuotaEntity(entity));
+ }
+ return new AlterClientQuotasResponse(entities, throttleTimeMs, e);
+ }
+
+ @Override
+ protected Struct toStruct() {
+ return data.toStruct(version());
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java
new file mode 100644
index 0000000..7e4e891
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.AlterClientQuotasResponseData;
+import org.apache.kafka.common.message.AlterClientQuotasResponseData.EntityData;
+import org.apache.kafka.common.message.AlterClientQuotasResponseData.EntryData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AlterClientQuotasResponse extends AbstractResponse {
+
+ private final AlterClientQuotasResponseData data;
+
+ public AlterClientQuotasResponse(Map<ClientQuotaEntity, ApiError> result, int throttleTimeMs) {
+ List<EntryData> entries = new ArrayList<>(result.size());
+ for (Map.Entry<ClientQuotaEntity, ApiError> entry : result.entrySet()) {
+ ApiError e = entry.getValue();
+ entries.add(new EntryData()
+ .setErrorCode(e.error().code())
+ .setErrorMessage(e.message())
+ .setEntity(toEntityData(entry.getKey())));
+ }
+
+ this.data = new AlterClientQuotasResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setEntries(entries);
+ }
+
+ public AlterClientQuotasResponse(Collection<ClientQuotaEntity> entities, int throttleTimeMs, Throwable e) {
+ short errorCode = Errors.forException(e).code();
+ String errorMessage = e.getMessage();
+
+ List<EntryData> entries = new ArrayList<>(entities.size());
+ for (ClientQuotaEntity entity : entities) {
+ entries.add(new EntryData()
+ .setErrorCode(errorCode)
+ .setErrorMessage(errorMessage)
+ .setEntity(toEntityData(entity)));
+ }
+
+ this.data = new AlterClientQuotasResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setEntries(entries);
+ }
+
+ public AlterClientQuotasResponse(Struct struct, short version) {
+ this.data = new AlterClientQuotasResponseData(struct, version);
+ }
+
+ public void complete(Map<ClientQuotaEntity, KafkaFutureImpl<Void>> futures) {
+ for (EntryData entryData : data.entries()) {
+ Map<String, String> entityEntries = new HashMap<>(entryData.entity().size());
+ for (EntityData entityData : entryData.entity()) {
+ entityEntries.put(entityData.entityType(), entityData.entityName());
+ }
+ ClientQuotaEntity entity = new ClientQuotaEntity(entityEntries);
+
+ KafkaFutureImpl<Void> future = futures.get(entity);
+ if (future == null) {
+ throw new IllegalArgumentException("Future map must contain entity " + entity);
+ }
+
+ Errors error = Errors.forCode(entryData.errorCode());
+ if (error == Errors.NONE) {
+ future.complete(null);
+ } else {
+ future.completeExceptionally(error.exception(entryData.errorMessage()));
+ }
+ }
+ }
+
+ @Override
+ public int throttleTimeMs() {
+ return data.throttleTimeMs();
+ }
+
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> counts = new HashMap<>();
+ for (EntryData entry : data.entries()) {
+ Errors error = Errors.forCode(entry.errorCode());
+ counts.put(error, counts.getOrDefault(error, 0) + 1);
+ }
+ return counts;
+ }
+
+ @Override
+ protected Struct toStruct(short version) {
+ return data.toStruct(version);
+ }
+
+ private static List<EntityData> toEntityData(ClientQuotaEntity entity) {
+ List<AlterClientQuotasResponseData.EntityData> entityData = new ArrayList<>(entity.entries().size());
+ for (Map.Entry<String, String> entry : entity.entries().entrySet()) {
+ entityData.add(new AlterClientQuotasResponseData.EntityData()
+ .setEntityType(entry.getKey())
+ .setEntityName(entry.getValue()));
+ }
+ return entityData;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasRequest.java
new file mode 100644
index 0000000..a5496ef
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasRequest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData.ComponentData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
+import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Collection;
+
+public class DescribeClientQuotasRequest extends AbstractRequest {
+ // These values must not change.
+ private static final byte MATCH_TYPE_EXACT = 0;
+ private static final byte MATCH_TYPE_DEFAULT = 1;
+ private static final byte MATCH_TYPE_SPECIFIED = 2;
+
+ public static class Builder extends AbstractRequest.Builder<DescribeClientQuotasRequest> {
+
+ private final DescribeClientQuotasRequestData data;
+
+ public Builder(ClientQuotaFilter filter) {
+ super(ApiKeys.DESCRIBE_CLIENT_QUOTAS);
+
+ List<ComponentData> componentData = new ArrayList<>(filter.components().size());
+ for (ClientQuotaFilterComponent component : filter.components()) {
+ ComponentData fd = new ComponentData().setEntityType(component.entityType());
+ if (component.match() == null) {
+ fd.setMatchType(MATCH_TYPE_SPECIFIED);
+ fd.setMatch(null);
+ } else if (component.match().isPresent()) {
+ fd.setMatchType(MATCH_TYPE_EXACT);
+ fd.setMatch(component.match().get());
+ } else {
+ fd.setMatchType(MATCH_TYPE_DEFAULT);
+ fd.setMatch(null);
+ }
+ componentData.add(fd);
+ }
+ this.data = new DescribeClientQuotasRequestData()
+ .setComponents(componentData)
+ .setStrict(filter.strict());
+ }
+
+ @Override
+ public DescribeClientQuotasRequest build(short version) {
+ return new DescribeClientQuotasRequest(data, version);
+ }
+
+ @Override
+ public String toString() {
+ return data.toString();
+ }
+ }
+
+ private final DescribeClientQuotasRequestData data;
+
+ public DescribeClientQuotasRequest(DescribeClientQuotasRequestData data, short version) {
+ super(ApiKeys.DESCRIBE_CLIENT_QUOTAS, version);
+ this.data = data;
+ }
+
+ public DescribeClientQuotasRequest(Struct struct, short version) {
+ super(ApiKeys.DESCRIBE_CLIENT_QUOTAS, version);
+ this.data = new DescribeClientQuotasRequestData(struct, version);
+ }
+
+ public ClientQuotaFilter filter() {
+ List<ClientQuotaFilterComponent> components = new ArrayList<>(data.components().size());
+ for (ComponentData componentData : data.components()) {
+ ClientQuotaFilterComponent component;
+ switch (componentData.matchType()) {
+ case MATCH_TYPE_EXACT:
+ component = ClientQuotaFilterComponent.ofEntity(componentData.entityType(), componentData.match());
+ break;
+ case MATCH_TYPE_DEFAULT:
+ component = ClientQuotaFilterComponent.ofDefaultEntity(componentData.entityType());
+ break;
+ case MATCH_TYPE_SPECIFIED:
+ component = ClientQuotaFilterComponent.ofEntityType(componentData.entityType());
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected match type: " + componentData.matchType());
+ }
+ components.add(component);
+ }
+ if (data.strict()) {
+ return ClientQuotaFilter.containsOnly(components);
+ } else {
+ return ClientQuotaFilter.contains(components);
+ }
+ }
+
+ @Override
+ public DescribeClientQuotasResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ return new DescribeClientQuotasResponse(throttleTimeMs, e);
+ }
+
+ @Override
+ protected Struct toStruct() {
+ return data.toStruct(version());
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java
new file mode 100644
index 0000000..6ed5b1b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntityData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.ValueData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DescribeClientQuotasResponse extends AbstractResponse {
+
+ private final DescribeClientQuotasResponseData data;
+
+ public DescribeClientQuotasResponse(Map<ClientQuotaEntity, Map<String, Double>> entities, int throttleTimeMs) {
+ List<EntryData> entries = new ArrayList<>(entities.size());
+ for (Map.Entry<ClientQuotaEntity, Map<String, Double>> entry : entities.entrySet()) {
+ ClientQuotaEntity quotaEntity = entry.getKey();
+ List<EntityData> entityData = new ArrayList<>(quotaEntity.entries().size());
+ for (Map.Entry<String, String> entityEntry : quotaEntity.entries().entrySet()) {
+ entityData.add(new EntityData()
+ .setEntityType(entityEntry.getKey())
+ .setEntityName(entityEntry.getValue()));
+ }
+
+ Map<String, Double> quotaValues = entry.getValue();
+ List<ValueData> valueData = new ArrayList<>(quotaValues.size());
+ for (Map.Entry<String, Double> valuesEntry : entry.getValue().entrySet()) {
+ valueData.add(new ValueData()
+ .setKey(valuesEntry.getKey())
+ .setValue(valuesEntry.getValue()));
+ }
+
+ entries.add(new EntryData()
+ .setEntity(entityData)
+ .setValues(valueData));
+ }
+
+ this.data = new DescribeClientQuotasResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setErrorCode((short) 0)
+ .setErrorMessage(null)
+ .setEntries(entries);
+ }
+
+ public DescribeClientQuotasResponse(int throttleTimeMs, Throwable e) {
+ this.data = new DescribeClientQuotasResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setErrorCode(Errors.forException(e).code())
+ .setErrorMessage(e.getMessage())
+ .setEntries(null);
+ }
+
+ public DescribeClientQuotasResponse(Struct struct, short version) {
+ this.data = new DescribeClientQuotasResponseData(struct, version);
+ }
+
+ public void complete(KafkaFutureImpl<Map<ClientQuotaEntity, Map<String, Double>>> future) {
+ Errors error = Errors.forCode(data.errorCode());
+ if (error != Errors.NONE) {
+ future.completeExceptionally(error.exception(data.errorMessage()));
+ return;
+ }
+
+ Map<ClientQuotaEntity, Map<String, Double>> result = new HashMap<>(data.entries().size());
+ for (EntryData entries : data.entries()) {
+ Map<String, String> entity = new HashMap<>(entries.entity().size());
+ for (EntityData entityData : entries.entity()) {
+ entity.put(entityData.entityType(), entityData.entityName());
+ }
+
+ Map<String, Double> values = new HashMap<>(entries.values().size());
+ for (ValueData valueData : entries.values()) {
+ values.put(valueData.key(), valueData.value());
+ }
+
+ result.put(new ClientQuotaEntity(entity), values);
+ }
+ future.complete(result);
+ }
+
+ @Override
+ public int throttleTimeMs() {
+ return data.throttleTimeMs();
+ }
+
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+ }
+
+ @Override
+ protected Struct toStruct(short version) {
+ return data.toStruct(version);
+ }
+}
diff --git a/clients/src/main/resources/common/message/AlterClientQuotasRequest.json b/clients/src/main/resources/common/message/AlterClientQuotasRequest.json
new file mode 100644
index 0000000..7e74d44
--- /dev/null
+++ b/clients/src/main/resources/common/message/AlterClientQuotasRequest.json
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 49,
+ "type": "request",
+ "name": "AlterClientQuotasRequest",
+ "validVersions": "0",
+ "flexibleVersions": "none",
+ "fields": [
+ { "name": "Entries", "type": "[]EntryData", "versions": "0+",
+ "about": "The quota configuration entries to alter.", "fields": [
+ { "name": "Entity", "type": "[]EntityData", "versions": "0+",
+ "about": "The quota entity to alter.", "fields": [
+ { "name": "EntityType", "type": "string", "versions": "0+",
+ "about": "The entity type." },
+ { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
+ "about": "The name of the entity, or null if the default." }
+ ]},
+ { "name": "Ops", "type": "[]OpData", "versions": "0+",
+ "about": "An individual quota configuration entry to alter.", "fields": [
+ { "name": "Key", "type": "string", "versions": "0+",
+ "about": "The quota configuration key." },
+ { "name": "Value", "type": "float64", "versions": "0+",
+ "about": "The value to set, otherwise ignored if the value is to be removed." },
+ { "name": "Remove", "type": "bool", "versions": "0+",
+ "about": "Whether the quota configuration value should be removed, otherwise set." }
+ ]}
+ ]},
+ { "name": "ValidateOnly", "type": "bool", "versions": "0+",
+ "about": "Whether the alteration should be validated, but not performed." }
+ ]
+}
diff --git a/clients/src/main/resources/common/message/AlterClientQuotasResponse.json b/clients/src/main/resources/common/message/AlterClientQuotasResponse.json
new file mode 100644
index 0000000..2f8fb88
--- /dev/null
+++ b/clients/src/main/resources/common/message/AlterClientQuotasResponse.json
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 49,
+ "type": "response",
+ "name": "AlterClientQuotasResponse",
+ "validVersions": "0",
+ "flexibleVersions": "none",
+ "fields": [
+ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+ "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
+ { "name": "Entries", "type": "[]EntryData", "versions": "0+",
+ "about": "The quota configuration entries to alter.", "fields": [
+ { "name": "ErrorCode", "type": "int16", "versions": "0+",
+ "about": "The error code, or `0` if the quota alteration succeeded." },
+ { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
+ "about": "The error message, or `null` if the quota alteration succeeded." },
+ { "name": "Entity", "type": "[]EntityData", "versions": "0+",
+ "about": "The quota entity to alter.", "fields": [
+ { "name": "EntityType", "type": "string", "versions": "0+",
+ "about": "The entity type." },
+ { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
+ "about": "The name of the entity, or null if the default." }
+ ]}
+ ]}
+ ]
+}
diff --git a/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json b/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
new file mode 100644
index 0000000..7abfd3c
--- /dev/null
+++ b/clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 48,
+ "type": "request",
+ "name": "DescribeClientQuotasRequest",
+ "validVersions": "0",
+ "flexibleVersions": "none",
+ "fields": [
+ { "name": "Components", "type": "[]ComponentData", "versions": "0+",
+ "about": "Filter components to apply to quota entities.", "fields": [
+ { "name": "EntityType", "type": "string", "versions": "0+",
+ "about": "The entity type that the filter component applies to." },
+ { "name": "MatchType", "type": "int8", "versions": "0+",
+ "about": "How to match the entity {0 = exact name, 1 = default name, 2 = any specified name}." },
+ { "name": "Match", "type": "string", "versions": "0+", "nullableVersions": "0+",
+ "about": "The string to match against, or null if unused for the match type." }
+ ]},
+ { "name": "Strict", "type": "bool", "versions": "0+",
+ "about": "Whether the match is strict, i.e. should exclude entities with unspecified entity types." }
+ ]
+}
diff --git a/clients/src/main/resources/common/message/DescribeClientQuotasResponse.json b/clients/src/main/resources/common/message/DescribeClientQuotasResponse.json
new file mode 100644
index 0000000..5f5c784
--- /dev/null
+++ b/clients/src/main/resources/common/message/DescribeClientQuotasResponse.json
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 48,
+ "type": "response",
+ "name": "DescribeClientQuotasResponse",
+ "validVersions": "0",
+ "flexibleVersions": "none",
+ "fields": [
+ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+ "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
+ { "name": "ErrorCode", "type": "int16", "versions": "0+",
+ "about": "The error code, or `0` if the quota description succeeded." },
+ { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
+ "about": "The error message, or `null` if the quota description succeeded." },
+ { "name": "Entries", "type": "[]EntryData", "versions": "0+", "nullableVersions": "0+",
+ "about": "A result entry.", "fields": [
+ { "name": "Entity", "type": "[]EntityData", "versions": "0+",
+ "about": "The quota entity description.", "fields": [
+ { "name": "EntityType", "type": "string", "versions": "0+",
+ "about": "The entity type." },
+ { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
+ "about": "The entity name, or null if the default." }
+ ]},
+ { "name": "Values", "type": "[]ValueData", "versions": "0+",
+ "about": "The quota values for the entity.", "fields": [
+ { "name": "Key", "type": "string", "versions": "0+",
+ "about": "The quota configuration key." },
+ { "name": "Value", "type": "float64", "versions": "0+",
+ "about": "The quota configuration value." }
+ ]}
+ ]}
+ ]
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 72c5fa9..b6f8417 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -89,6 +89,11 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
+import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
+import org.apache.kafka.common.requests.AlterClientQuotasResponse;
import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.CreateAclsResponse;
@@ -101,6 +106,7 @@ import org.apache.kafka.common.requests.DeleteRecordsResponse;
import org.apache.kafka.common.requests.DeleteTopicsRequest;
import org.apache.kafka.common.requests.DeleteTopicsResponse;
import org.apache.kafka.common.requests.DescribeAclsResponse;
+import org.apache.kafka.common.requests.DescribeClientQuotasResponse;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeGroupsResponse;
import org.apache.kafka.common.requests.ElectLeadersResponse;
@@ -638,6 +644,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().createPendingAuthenticationError(cluster.nodes().get(0),
TimeUnit.DAYS.toMillis(1));
callAdminClientApisAndExpectAnAuthenticationError(env);
+ callClientQuotasApisAndExpectAnAuthenticationError(env);
}
}
@@ -696,6 +703,26 @@ public class KafkaAdminClientTest {
}
}
+ private void callClientQuotasApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) throws InterruptedException {
+ try {
+ env.adminClient().describeClientQuotas(ClientQuotaFilter.all()).entities().get();
+ fail("Expected an authentication error.");
+ } catch (ExecutionException e) {
+ assertTrue("Expected an authentication error, but got " + Utils.stackTrace(e),
+ e.getCause() instanceof AuthenticationException);
+ }
+
+ try {
+ ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"));
+ ClientQuotaAlteration alteration = new ClientQuotaAlteration(entity, asList(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)));
+ env.adminClient().alterClientQuotas(asList(alteration)).all().get();
+ fail("Expected an authentication error.");
+ } catch (ExecutionException e) {
+ assertTrue("Expected an authentication error, but got " + Utils.stackTrace(e),
+ e.getCause() instanceof AuthenticationException);
+ }
+ }
+
private static final AclBinding ACL1 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
private static final AclBinding ACL2 = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic4", PatternType.LITERAL),
@@ -2810,6 +2837,74 @@ public class KafkaAdminClientTest {
}
}
+ private ClientQuotaEntity newClientQuotaEntity(String... args) {
+ assertTrue(args.length % 2 == 0);
+
+ Map<String, String> entityMap = new HashMap<>(args.length / 2);
+ for (int index = 0; index < args.length; index += 2) {
+ entityMap.put(args[index], args[index + 1]);
+ }
+ return new ClientQuotaEntity(entityMap);
+ }
+
+ @Test
+ public void testDescribeClientQuotas() throws Exception {
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ final String value = "value";
+
+ Map<ClientQuotaEntity, Map<String, Double>> responseData = new HashMap<>();
+ ClientQuotaEntity entity1 = newClientQuotaEntity(ClientQuotaEntity.USER, "user-1", ClientQuotaEntity.CLIENT_ID, value);
+ ClientQuotaEntity entity2 = newClientQuotaEntity(ClientQuotaEntity.USER, "user-2", ClientQuotaEntity.CLIENT_ID, value);
+ responseData.put(entity1, Collections.singletonMap("consumer_byte_rate", 10000.0));
+ responseData.put(entity2, Collections.singletonMap("producer_byte_rate", 20000.0));
+
+ env.kafkaClient().prepareResponse(new DescribeClientQuotasResponse(responseData, 0));
+
+ ClientQuotaFilter filter = ClientQuotaFilter.contains(asList(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, value)));
+
+ DescribeClientQuotasResult result = env.adminClient().describeClientQuotas(filter);
+ Map<ClientQuotaEntity, Map<String, Double>> resultData = result.entities().get();
+ assertEquals(resultData.size(), 2);
+ assertTrue(resultData.containsKey(entity1));
+ Map<String, Double> config1 = resultData.get(entity1);
+ assertEquals(config1.size(), 1);
+ assertEquals(config1.get("consumer_byte_rate"), 10000.0, 1e-6);
+ assertTrue(resultData.containsKey(entity2));
+ Map<String, Double> config2 = resultData.get(entity2);
+ assertEquals(config2.size(), 1);
+ assertEquals(config2.get("producer_byte_rate"), 20000.0, 1e-6);
+ }
+ }
+
+ public void testAlterClientQuotas() throws Exception {
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ ClientQuotaEntity goodEntity = newClientQuotaEntity(ClientQuotaEntity.USER, "user-1");
+ ClientQuotaEntity unauthorizedEntity = newClientQuotaEntity(ClientQuotaEntity.USER, "user-0");
+ ClientQuotaEntity invalidEntity = newClientQuotaEntity("", "user-0");
+
+ Map<ClientQuotaEntity, ApiError> responseData = new HashMap<>(2);
+ responseData.put(goodEntity, new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, "Authorization failed"));
+ responseData.put(unauthorizedEntity, new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, "Authorization failed"));
+ responseData.put(invalidEntity, new ApiError(Errors.INVALID_REQUEST, "Invalid quota entity"));
+
+ env.kafkaClient().prepareResponse(new AlterClientQuotasResponse(responseData, 0));
+
+ List<ClientQuotaAlteration> entries = new ArrayList<>(3);
+ entries.add(new ClientQuotaAlteration(goodEntity, Collections.singleton(new ClientQuotaAlteration.Op("consumer_byte_rate", 10000.0))));
+ entries.add(new ClientQuotaAlteration(unauthorizedEntity, Collections.singleton(new ClientQuotaAlteration.Op("producer_byte_rate", 10000.0))));
+ entries.add(new ClientQuotaAlteration(invalidEntity, Collections.singleton(new ClientQuotaAlteration.Op("producer_byte_rate", 100.0))));
+
+ AlterClientQuotasResult result = env.adminClient().alterClientQuotas(entries);
+ result.values().get(goodEntity);
+ TestUtils.assertFutureError(result.values().get(unauthorizedEntity), ClusterAuthorizationException.class);
+ TestUtils.assertFutureError(result.values().get(invalidEntity), InvalidRequestException.class);
+ }
+ }
+
private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member,
MemberAssignment assignment) {
return new MemberDescription(member.memberId(),
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 7cfd051..c6d9d14 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -33,6 +33,8 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
import java.time.Duration;
import java.util.ArrayList;
@@ -459,6 +461,16 @@ public class MockAdminClient extends AdminClient {
}
@Override
+ public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @Override
+ public AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @Override
public void close(Duration timeout) {}
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 562a91a..de2b593 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -28,11 +28,12 @@ import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncod
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, Config => JConfig}
+import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, Config => JConfig}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
@@ -61,7 +62,8 @@ import scala.collection._
object ConfigCommand extends Config {
val BrokerLoggerConfigType = "broker-loggers"
- val BrokerSupportedConfigTypes = Seq(ConfigType.Topic, ConfigType.Broker, BrokerLoggerConfigType)
+ val BrokerSupportedConfigTypes = ConfigType.all :+ BrokerLoggerConfigType
+ val ZkSupportedConfigTypes = ConfigType.all
val DefaultScramIterations = 4096
// Dynamic broker configs can only be updated using the new AdminClient once brokers have started
// so that configs may be fully validated. Prior to starting brokers, updates may be performed using
@@ -283,14 +285,8 @@ object ConfigCommand extends Config {
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
val adminClient = Admin.create(props)
- if (opts.entityTypes.size != 1)
- throw new IllegalArgumentException(s"Exactly one entity type (out of ${BrokerSupportedConfigTypes.mkString(",")}) must be specified with --bootstrap-server")
-
- val entityNames = opts.entityNames
- if (entityNames.size > 1)
- throw new IllegalArgumentException(s"At most one entity name must be specified with --bootstrap-server")
- else if (opts.options.has(opts.alterOpt) && entityNames.size != 1)
- throw new IllegalArgumentException(s"Exactly one entity name must be specified with --bootstrap-server for --alter")
+ if (opts.options.has(opts.alterOpt) && opts.entityTypes.size != opts.entityNames.size)
+ throw new IllegalArgumentException(s"An entity name must be specified for every entity type")
try {
if (opts.options.has(opts.alterOpt))
@@ -303,14 +299,17 @@ object ConfigCommand extends Config {
}
private[admin] def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
- val entityType = opts.entityTypes.head
- val entityName = opts.entityNames.head
- val configsToBeAdded = parseConfigsToBeAdded(opts).asScala.map { case (k, v) => (k, new ConfigEntry(k, v)) }
+ val entityTypes = opts.entityTypes
+ val entityNames = opts.entityNames
+ val entityTypeHead = entityTypes.head
+ val entityNameHead = entityNames.head
+ val configsToBeAddedMap = parseConfigsToBeAdded(opts).asScala
+ val configsToBeAdded = configsToBeAddedMap.map { case (k, v) => (k, new ConfigEntry(k, v)) }
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
- entityType match {
+ entityTypeHead match {
case ConfigType.Topic =>
- val oldConfig = getConfig(adminClient, entityType, entityName, includeSynonyms = false, describeAll = false)
+ val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false)
.map { entry => (entry.name, entry) }.toMap
// fail the command if any of the configs to be deleted does not exist
@@ -318,7 +317,7 @@ object ConfigCommand extends Config {
if (invalidConfigs.nonEmpty)
throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
- val configResource = new ConfigResource(ConfigResource.Type.TOPIC, entityName)
+ val configResource = new ConfigResource(ConfigResource.Type.TOPIC, entityNameHead)
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
@@ -326,7 +325,7 @@ object ConfigCommand extends Config {
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
case ConfigType.Broker =>
- val oldConfig = getConfig(adminClient, entityType, entityName, includeSynonyms = false, describeAll = false)
+ val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false)
.map { entry => (entry.name, entry) }.toMap
// fail the command if any of the configs to be deleted does not exist
@@ -340,38 +339,75 @@ object ConfigCommand extends Config {
throw new InvalidConfigurationException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}")
val newConfig = new JConfig(newEntries.asJava.values)
- val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityName)
+ val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityNameHead)
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
case BrokerLoggerConfigType =>
- val validLoggers = getConfig(adminClient, entityType, entityName, includeSynonyms = true, describeAll = false).map(_.name)
+ val validLoggers = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = true, describeAll = false).map(_.name)
// fail the command if any of the configured broker loggers do not exist
val invalidBrokerLoggers = configsToBeDeleted.filterNot(validLoggers.contains) ++ configsToBeAdded.keys.filterNot(validLoggers.contains)
if (invalidBrokerLoggers.nonEmpty)
throw new InvalidConfigurationException(s"Invalid broker logger(s): ${invalidBrokerLoggers.mkString(",")}")
- val configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityName)
+ val configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityNameHead)
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
val alterLogLevelEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
).asJavaCollection
adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
- case _ => throw new IllegalArgumentException(s"Unsupported entity type: $entityType")
+ case ConfigType.User =>
+ case ConfigType.Client =>
+ val oldConfig: Map[String, java.lang.Double] = getClientQuotasConfig(adminClient, entityTypes, entityNames)
+
+ val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
+ if (invalidConfigs.nonEmpty)
+ throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
+
+ val entity = new ClientQuotaEntity(opts.entityTypes.map { entType =>
+ entType match {
+ case ConfigType.User => ClientQuotaEntity.USER
+ case ConfigType.Client => ClientQuotaEntity.CLIENT_ID
+ case _ => throw new IllegalArgumentException(s"Unexpected entity type: ${entType}")
+ }
+ }.zip(opts.entityNames).toMap.asJava)
+
+ val alterOptions = new AlterClientQuotasOptions().validateOnly(false)
+ val alterOps = (configsToBeAddedMap.map { case (key, value) =>
+ val doubleValue = try value.toDouble catch {
+ case _: NumberFormatException =>
+ throw new IllegalArgumentException(s"Cannot parse quota configuration value for ${key}: ${value}")
+ }
+ new ClientQuotaAlteration.Op(key, doubleValue)
+ } ++ configsToBeDeleted.map(key => new ClientQuotaAlteration.Op(key, null))).asJavaCollection
+
+ adminClient.alterClientQuotas(Collections.singleton(new ClientQuotaAlteration(entity, alterOps)), alterOptions)
+ .all().get(60, TimeUnit.SECONDS)
+
+ case _ => throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead")
}
- if (entityName.nonEmpty)
- println(s"Completed updating config for ${entityType.dropRight(1)} $entityName.")
+ if (entityNameHead.nonEmpty)
+ println(s"Completed updating config for ${entityTypeHead.dropRight(1)} $entityNameHead.")
else
- println(s"Completed updating default config for $entityType in the cluster.")
+ println(s"Completed updating default config for $entityTypeHead in the cluster.")
}
private[admin] def describeConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
- val entityType = opts.entityTypes.head
- val entityName = opts.entityNames.headOption
+ val entityTypes = opts.entityTypes
+ val entityNames = opts.entityNames
val describeAll = opts.options.has(opts.allOpt)
+ entityTypes.head match {
+ case ConfigType.Topic | ConfigType.Broker | BrokerLoggerConfigType =>
+ describeResourceConfig(adminClient, entityTypes.head, entityNames.headOption, describeAll)
+ case ConfigType.User | ConfigType.Client =>
+ describeClientQuotasConfig(adminClient, entityTypes, entityNames)
+ }
+ }
+
+ private def describeResourceConfig(adminClient: Admin, entityType: String, entityName: Option[String], describeAll: Boolean): Unit = {
val entities = entityName
.map(name => List(name))
.getOrElse(entityType match {
@@ -389,14 +425,14 @@ object ConfigCommand extends Config {
val configSourceStr = if (describeAll) "All" else "Dynamic"
println(s"$configSourceStr configs for ${entityType.dropRight(1)} $entity are:")
}
- getConfig(adminClient, entityType, entity, includeSynonyms = true, describeAll).foreach { entry =>
+ getResourceConfig(adminClient, entityType, entity, includeSynonyms = true, describeAll).foreach { entry =>
val synonyms = entry.synonyms.asScala.map(synonym => s"${synonym.source}:${synonym.name}=${synonym.value}").mkString(", ")
println(s" ${entry.name}=${entry.value} sensitive=${entry.isSensitive} synonyms={$synonyms}")
}
}
}
- private def getConfig(adminClient: Admin, entityType: String, entityName: String, includeSynonyms: Boolean, describeAll: Boolean) = {
+ private def getResourceConfig(adminClient: Admin, entityType: String, entityName: String, includeSynonyms: Boolean, describeAll: Boolean) = {
def validateBrokerId(): Unit = try entityName.toInt catch {
case _: NumberFormatException =>
throw new IllegalArgumentException(s"The entity name for $entityType must be a valid integer broker id, found: $entityName")
@@ -436,6 +472,36 @@ object ConfigCommand extends Config {
}).toSeq
}
+ private def describeClientQuotasConfig(adminClient: Admin, entityTypes: List[String], entityNames: List[String]) = {
+ getAllClientQuotasConfigs(adminClient, entityTypes, entityNames).foreach { case (entity, entries) =>
+ val entityEntries = entity.entries.asScala
+ val entityStr = (entityEntries.get(ClientQuotaEntity.USER).map(u => s"user-principal '${u}'") ++
+ entityEntries.get(ClientQuotaEntity.CLIENT_ID).map(c => s"client-id '${c}'")).mkString(", ")
+ val entriesStr = entries.asScala.map(e => s"${e._1}=${e._2}").mkString(", ")
+ println(s"Configs for ${entityStr} are ${entriesStr}")
+ }
+ }
+
+ private def getClientQuotasConfig(adminClient: Admin, entityTypes: List[String], entityNames: List[String]): Map[String, java.lang.Double] = {
+ if (entityTypes.size != entityNames.size)
+ throw new IllegalArgumentException("Exactly one entity name must be specified for every entity type")
+ getAllClientQuotasConfigs(adminClient, entityTypes, entityNames).headOption.map(_._2.asScala).getOrElse(Map.empty)
+ }
+
+ private def getAllClientQuotasConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String]) = {
+ val components = entityTypes.map(Some(_)).zipAll(entityNames.map(Some(_)), None, None).map { case (entityTypeOpt, entityNameOpt) =>
+ val entityType = entityTypeOpt match {
+ case Some(ConfigType.User) => ClientQuotaEntity.USER
+ case Some(ConfigType.Client) => ClientQuotaEntity.CLIENT_ID
+ case Some(_) => throw new IllegalArgumentException(s"Unexpected entity type ${entityTypeOpt.get}")
+ case None => throw new IllegalArgumentException("More entity names specified than entity types")
+ }
+ entityNameOpt.map(ClientQuotaFilterComponent.ofEntity(entityType, _)).getOrElse(ClientQuotaFilterComponent.ofEntityType(entityType))
+ }
+
+ adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components.asJava)).entities.get(30, TimeUnit.SECONDS).asScala
+ }
+
case class Entity(entityType: String, sanitizedName: Option[String]) {
val entityPath = sanitizedName match {
case Some(n) => entityType + "/" + n
@@ -506,7 +572,7 @@ object ConfigCommand extends Config {
val entityTypes = opts.entityTypes
val entityNames = opts.entityNames
if (entityTypes.head == ConfigType.User || entityTypes.head == ConfigType.Client)
- parseQuotaEntity(opts, entityTypes, entityNames)
+ parseClientQuotaEntity(opts, entityTypes, entityNames)
else {
// Exactly one entity type and at-most one entity name expected for other entities
val name = entityNames.headOption match {
@@ -517,7 +583,7 @@ object ConfigCommand extends Config {
}
}
- private def parseQuotaEntity(opts: ConfigCommandOptions, types: List[String], names: List[String]): ConfigEntity = {
+ private def parseClientQuotaEntity(opts: ConfigCommandOptions, types: List[String], names: List[String]): ConfigEntity = {
if (opts.options.has(opts.alterOpt) && names.size != types.size)
throw new IllegalArgumentException("--entity-name or --entity-default must be specified with each --entity-type for --alter")
@@ -651,7 +717,7 @@ object ConfigCommand extends Config {
val (allowedEntityTypes, connectOptString) = if (options.has(bootstrapServerOpt))
(BrokerSupportedConfigTypes, "--bootstrap-server")
else
- (ConfigType.all, "--zookeeper")
+ (ZkSupportedConfigTypes, "--zookeeper")
entityTypeVals.foreach(entityTypeVal =>
if (!allowedEntityTypes.contains(entityTypeVal))
@@ -687,9 +753,6 @@ object ConfigCommand extends Config {
}
}
- if (entityTypeVals.contains(ConfigType.Client) || entityTypeVals.contains(ConfigType.User))
- CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
-
if (options.has(describeOpt) && entityTypeVals.contains(BrokerLoggerConfigType) && !hasEntityName)
throw new IllegalArgumentException(s"an entity name must be specified with --describe of ${entityTypeVals.mkString(",")}")
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 01c6d41..ea92bfd 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -29,19 +29,21 @@ import org.apache.kafka.clients.admin.AlterConfigOp
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.ConfigDef.ConfigKey
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource, LogLevelConfig}
-import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, TopicExistsException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicConfigs, CreatableTopicResult}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
+import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.requests.CreateTopicsRequest._
import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, DescribeConfigsResponse}
-import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
-import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
+import org.apache.kafka.common.utils.Sanitizer
import scala.collection.{Map, mutable, _}
import scala.collection.JavaConverters._
@@ -711,4 +713,195 @@ class AdminManager(val config: KafkaConfig,
val readOnly = !DynamicBrokerConfig.AllDynamicConfigs.contains(name)
new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, readOnly, synonyms.asJava)
}
+
+ private def sanitizeEntityName(entityName: String): String = {
+ if (entityName == ConfigEntityName.Default)
+ throw new InvalidRequestException(s"Entity name '${ConfigEntityName.Default}' is reserved")
+ Sanitizer.sanitize(Option(entityName).getOrElse(ConfigEntityName.Default))
+ }
+
+ private def desanitizeEntityName(sanitizedEntityName: String): String =
+ Sanitizer.desanitize(sanitizedEntityName) match {
+ case ConfigEntityName.Default => null
+ case name => name
+ }
+
+ private def entityToSanitizedUserClientId(entity: ClientQuotaEntity): (Option[String], Option[String]) = {
+ if (entity.entries.isEmpty)
+ throw new InvalidRequestException("Invalid empty client quota entity")
+
+ var user: Option[String] = None
+ var clientId: Option[String] = None
+ entity.entries.asScala.foreach { case (entityType, entityName) =>
+ val sanitizedEntityName = Some(sanitizeEntityName(entityName))
+ entityType match {
+ case ClientQuotaEntity.USER => user = sanitizedEntityName
+ case ClientQuotaEntity.CLIENT_ID => clientId = sanitizedEntityName
+ case _ => throw new InvalidRequestException(s"Unhandled client quota entity type: ${entityType}")
+ }
+ if (entityName != null && entityName.isEmpty)
+ throw new InvalidRequestException(s"Empty ${entityType} not supported")
+ }
+ (user, clientId)
+ }
+
+ private def userClientIdToEntity(user: Option[String], clientId: Option[String]): ClientQuotaEntity = {
+ new ClientQuotaEntity((user.map(u => ClientQuotaEntity.USER -> u) ++ clientId.map(c => ClientQuotaEntity.CLIENT_ID -> c)).toMap.asJava)
+ }
+
+ def describeClientQuotas(filter: ClientQuotaFilter): Map[ClientQuotaEntity, Map[String, Double]] = {
+ var userComponent: Option[ClientQuotaFilterComponent] = None
+ var clientIdComponent: Option[ClientQuotaFilterComponent] = None
+ filter.components.asScala.foreach { component =>
+ component.entityType match {
+ case ClientQuotaEntity.USER =>
+ if (userComponent.isDefined)
+ throw new InvalidRequestException(s"Duplicate user filter component entity type");
+ userComponent = Some(component)
+ case ClientQuotaEntity.CLIENT_ID =>
+ if (clientIdComponent.isDefined)
+ throw new InvalidRequestException(s"Duplicate client filter component entity type");
+ clientIdComponent = Some(component)
+ case "" =>
+ throw new InvalidRequestException(s"Unexpected empty filter component entity type")
+ case et =>
+ // Supplying other entity types is not yet supported.
+ throw new UnsupportedVersionException(s"Custom entity type '${et}' not supported")
+ }
+ }
+ handleDescribeClientQuotas(userComponent, clientIdComponent, filter.strict)
+ }
+
+ def handleDescribeClientQuotas(userComponent: Option[ClientQuotaFilterComponent],
+ clientIdComponent: Option[ClientQuotaFilterComponent], strict: Boolean) = {
+
+ def toOption(opt: java.util.Optional[String]): Option[String] =
+ if (opt == null)
+ None
+ else if (opt.isPresent)
+ Some(opt.get)
+ else
+ Some(null)
+
+ val user = userComponent.flatMap(c => toOption(c.`match`))
+ val clientId = clientIdComponent.flatMap(c => toOption(c.`match`))
+
+ def sanitized(name: Option[String]): String = name.map(n => sanitizeEntityName(n)).getOrElse("")
+ val sanitizedUser = sanitized(user)
+ val sanitizedClientId = sanitized(clientId)
+
+ def wantExact(component: Option[ClientQuotaFilterComponent]): Boolean = component.exists(_.`match` != null)
+ val exactUser = wantExact(userComponent)
+ val exactClientId = wantExact(clientIdComponent)
+
+ def wantExcluded(component: Option[ClientQuotaFilterComponent]): Boolean = strict && !component.isDefined
+ val excludeUser = wantExcluded(userComponent)
+ val excludeClientId = wantExcluded(clientIdComponent)
+
+ val userEntries = if (exactUser && excludeClientId)
+ Map(((Some(user.get), None) -> adminZkClient.fetchEntityConfig(ConfigType.User, sanitizedUser)))
+ else if (!excludeUser && !exactClientId)
+ adminZkClient.fetchAllEntityConfigs(ConfigType.User).map { case (name, props) =>
+ ((Some(desanitizeEntityName(name)), None) -> props)
+ }
+ else
+ Map.empty
+
+ val clientIdEntries = if (excludeUser && exactClientId)
+ Map(((None, Some(clientId.get)) -> adminZkClient.fetchEntityConfig(ConfigType.Client, sanitizedClientId)))
+ else if (!exactUser && !excludeClientId)
+ adminZkClient.fetchAllEntityConfigs(ConfigType.Client).map { case (name, props) =>
+ ((None, Some(desanitizeEntityName(name))) -> props)
+ }
+ else
+ Map.empty
+
+ val bothEntries = if (exactUser && exactClientId)
+ Map(((Some(user.get), Some(clientId.get)) ->
+ adminZkClient.fetchEntityConfig(ConfigType.User, s"${sanitizedUser}/clients/${sanitizedClientId}")))
+ else if (!excludeUser && !excludeClientId)
+ adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).map { case (name, props) =>
+ val components = name.split("/")
+ if (components.size != 3 || components(1) != "clients")
+ throw new IllegalArgumentException(s"Unexpected config path: ${name}")
+ ((Some(desanitizeEntityName(components(0))), Some(desanitizeEntityName(components(2)))) -> props)
+ }
+ else
+ Map.empty
+
+ def matches(nameComponent: Option[ClientQuotaFilterComponent], name: Option[String]): Boolean = nameComponent match {
+ case Some(component) =>
+ toOption(component.`match`) match {
+ case Some(n) => name.exists(_ == n)
+ case None => name.isDefined
+ }
+ case None =>
+ !name.isDefined || !strict
+ }
+
+ def fromProps(props: Properties): Map[String, Double] = {
+ props.asScala.map { case (key, value) =>
+ val doubleValue = try value.toDouble catch {
+ case _: NumberFormatException =>
+ throw new IllegalStateException(s"Unexpected client quota configuration value: ${key} -> ${value}")
+ }
+ (key -> doubleValue)
+ }
+ }
+
+ (userEntries ++ clientIdEntries ++ bothEntries).map { case ((u, c), p) =>
+ if (!p.isEmpty && matches(userComponent, u) && matches(clientIdComponent, c))
+ Some((userClientIdToEntity(u, c) -> fromProps(p)))
+ else
+ None
+ }.flatten.toMap
+ }
+
+ def alterClientQuotas(entries: Seq[ClientQuotaAlteration], validateOnly: Boolean): Map[ClientQuotaEntity, ApiError] = {
+ def alterEntityQuotas(entity: ClientQuotaEntity, ops: Iterable[ClientQuotaAlteration.Op]): Unit = {
+ val (path, configType, configKeys) = entityToSanitizedUserClientId(entity) match {
+ case (Some(user), Some(clientId)) => (user + "/clients/" + clientId, ConfigType.User, DynamicConfig.User.configKeys)
+ case (Some(user), None) => (user, ConfigType.User, DynamicConfig.User.configKeys)
+ case (None, Some(clientId)) => (clientId, ConfigType.Client, DynamicConfig.Client.configKeys)
+ case _ => throw new InvalidRequestException("Invalid empty client quota entity")
+ }
+
+ val props = adminZkClient.fetchEntityConfig(configType, path)
+ ops.foreach { op =>
+ op.value match {
+ case null =>
+ props.remove(op.key)
+ case value => configKeys.get(op.key) match {
+ case null =>
+ throw new InvalidRequestException(s"Invalid configuration key ${op.key}")
+ case key => key.`type` match {
+ case ConfigDef.Type.DOUBLE =>
+ props.setProperty(op.key, value.toString)
+ case ConfigDef.Type.LONG =>
+ val epsilon = 1e-6
+ val longValue = (value + epsilon).toLong
+ if ((longValue.toDouble - value).abs > epsilon)
+ throw new InvalidRequestException(s"Configuration ${op.key} must be a Long value")
+ props.setProperty(op.key, longValue.toString)
+ case _ =>
+ throw new IllegalStateException(s"Unexpected config type ${key.`type`}")
+ }
+ }
+ }
+ }
+ if (!validateOnly)
+ adminZkClient.changeConfigs(configType, path, props)
+ }
+ entries.map { entry =>
+ val apiError = try {
+ alterEntityQuotas(entry.entity, entry.ops.asScala)
+ ApiError.NONE
+ } catch {
+ case e: Throwable =>
+ info(s"Error encountered while updating client quotas", e)
+ ApiError.fromThrowable(e)
+ }
+ (entry.entity -> apiError)
+ }.toMap
+ }
}
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index e5974d3..abea5de 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -89,6 +89,8 @@ object DynamicConfig {
.define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
.define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc)
+ def configKeys = clientConfigs.configKeys
+
def names = clientConfigs.names
def validate(props: Properties) = DynamicConfig.validate(clientConfigs, props, customPropsAllowed = false)
@@ -102,6 +104,8 @@ object DynamicConfig {
.define(Client.ConsumerByteRateOverrideProp, LONG, Client.DefaultConsumerOverride, MEDIUM, Client.ConsumerOverrideDoc)
.define(Client.RequestPercentageOverrideProp, DOUBLE, Client.DefaultRequestOverride, MEDIUM, Client.RequestOverrideDoc)
+ def configKeys = userConfigs.configKeys
+
def names = userConfigs.names
def validate(props: Properties) = DynamicConfig.validate(userConfigs, props, customPropsAllowed = false)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d092cdd..4f12473 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -174,6 +174,8 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignmentsRequest(request)
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignmentsRequest(request)
case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request)
+ case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
+ case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotasRequest(request)
}
} catch {
case e: FatalExitError => throw e
@@ -2813,6 +2815,34 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
+ def handleDescribeClientQuotasRequest(request: RequestChannel.Request): Unit = {
+ val describeClientQuotasRequest = request.body[DescribeClientQuotasRequest]
+
+ if (authorize(request, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) {
+ val result = adminManager.describeClientQuotas(
+ describeClientQuotasRequest.filter).mapValues(_.mapValues(Double.box).asJava).asJava
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new DescribeClientQuotasResponse(result, requestThrottleMs))
+ } else {
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ describeClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+ }
+ }
+
+ def handleAlterClientQuotasRequest(request: RequestChannel.Request): Unit = {
+ val alterClientQuotasRequest = request.body[AlterClientQuotasRequest]
+
+ if (authorize(request, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) {
+ val result = adminManager.alterClientQuotas(alterClientQuotasRequest.entries().asScala.toSeq,
+ alterClientQuotasRequest.validateOnly()).asJava
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new AlterClientQuotasResponse(result, requestThrottleMs))
+ } else {
+ sendResponseMaybeThrottle(request, requestThrottleMs =>
+ alterClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+ }
+ }
+
private def authorize(request: RequestChannel.Request,
operation: AclOperation,
resourceType: ResourceType,
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index e938a6d..5a76f75 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.Node
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
import org.apache.kafka.common.utils.Sanitizer
@@ -92,8 +93,18 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
}
@Test
+ def shouldParseArgumentsForClientsEntityType(): Unit = {
+ testArgumentParse("clients", zkConfig = false)
+ }
+
+ @Test
def shouldParseArgumentsForUsersEntityTypeUsingZookeeper(): Unit = {
- testArgumentParse("clients", zkConfig = true)
+ testArgumentParse("users", zkConfig = true)
+ }
+
+ @Test
+ def shouldParseArgumentsForUsersEntityType(): Unit = {
+ testArgumentParse("users", zkConfig = false)
}
@Test
@@ -215,10 +226,14 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
assertTrue(addedProps2.getProperty("f").isEmpty)
}
- @Test
- def testOptionEntityTypeNames(): Unit = {
+ def doTestOptionEntityTypeNames(zkConfig: Boolean): Unit = {
+ val connectOpts = if (zkConfig)
+ ("--zookeeper", zkConnect)
+ else
+ ("--bootstrap-server", "localhost:9092")
+
def testExpectedEntityTypeNames(expectedTypes: List[String], expectedNames: List[String], args: String*): Unit = {
- val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, "--describe") ++ args)
+ val createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2, "--describe") ++ args)
createOpts.checkArgs()
assertEquals(createOpts.entityTypes, expectedTypes)
assertEquals(createOpts.entityNames, expectedNames)
@@ -243,6 +258,16 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
testExpectedEntityTypeNames(List(ConfigType.Broker), List.empty, "--entity-type", "brokers")
}
+ @Test
+ def testOptionEntityTypeNamesUsingZookeeper(): Unit = {
+ doTestOptionEntityTypeNames(zkConfig = true)
+ }
+
+ @Test
+ def testOptionEntityTypeNames(): Unit = {
+ doTestOptionEntityTypeNames(zkConfig = false)
+ }
+
@Test(expected = classOf[IllegalArgumentException])
def shouldFailIfUnrecognisedEntityTypeUsingZookeeper(): Unit = {
val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
@@ -292,6 +317,13 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
createOpts.checkArgs()
}
+ @Test(expected = classOf[IllegalArgumentException])
+ def shouldFailIfMixedEntityTypeFlags(): Unit = {
+ val createOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
+ "--entity-name", "A", "--entity-type", "users", "--client", "B", "--describe"))
+ createOpts.checkArgs()
+ }
+
@Test
def shouldAddClientConfigUsingZookeeper(): Unit = {
val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
@@ -312,6 +344,66 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
}
@Test
+ def shouldAddClientConfig(): Unit = {
+ val createOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092",
+ "--entity-name", "my-client-id",
+ "--entity-type", "clients",
+ "--alter",
+ "--add-config", "consumer_byte_rate=20000,producer_byte_rate=10000",
+ "--delete-config", "request_percentage"))
+
+ val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.CLIENT_ID -> "my-client-id")).asJava)
+
+ var describedConfigs = false
+ val describeFuture = new KafkaFutureImpl[util.Map[ClientQuotaEntity, util.Map[String, java.lang.Double]]]
+ describeFuture.complete(Map((entity -> Map(("request_percentage" -> Double.box(50.0))).asJava)).asJava)
+ val describeResult: DescribeClientQuotasResult = EasyMock.createNiceMock(classOf[DescribeClientQuotasResult])
+ EasyMock.expect(describeResult.entities()).andReturn(describeFuture)
+
+ var alteredConfigs = false
+ val alterFuture = new KafkaFutureImpl[Void]
+ alterFuture.complete(null)
+ val alterResult: AlterClientQuotasResult = EasyMock.createNiceMock(classOf[AlterClientQuotasResult])
+ EasyMock.expect(alterResult.all()).andReturn(alterFuture)
+
+ val node = new Node(1, "localhost", 9092)
+ val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
+ override def describeClientQuotas(filter: ClientQuotaFilter, options: DescribeClientQuotasOptions): DescribeClientQuotasResult = {
+ assertEquals(1, filter.components.size)
+ assertTrue(filter.strict)
+ val component = filter.components.asScala.head
+ assertEquals(ClientQuotaEntity.CLIENT_ID, component.entityType)
+ assertTrue(component.`match`.isPresent)
+ assertEquals("my-client-id", component.`match`.get)
+ describedConfigs = true
+ describeResult
+ }
+
+ override def alterClientQuotas(entries: util.Collection[ClientQuotaAlteration], options: AlterClientQuotasOptions): AlterClientQuotasResult = {
+ assertFalse(options.validateOnly)
+ assertEquals(1, entries.size)
+ val alteration = entries.asScala.head
+ assertEquals(entity, alteration.entity)
+ val ops = alteration.ops.asScala
+ assertEquals(3, ops.size)
+ val expectedOps = Set(
+ new ClientQuotaAlteration.Op("consumer_byte_rate", Double.box(20000)),
+ new ClientQuotaAlteration.Op("producer_byte_rate", Double.box(10000)),
+ new ClientQuotaAlteration.Op("request_percentage", null)
+ )
+ assertEquals(expectedOps, ops.toSet)
+ alteredConfigs = true
+ alterResult
+ }
+ }
+ EasyMock.replay(alterResult, describeResult)
+ ConfigCommand.alterConfig(mockAdminClient, createOpts)
+ assertTrue(describedConfigs)
+ assertTrue(alteredConfigs)
+ EasyMock.reset(alterResult, describeResult)
+ }
+
+ @Test
def shouldAddTopicConfigUsingZookeeper(): Unit = {
val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
"--entity-name", "my-topic",
@@ -935,12 +1027,14 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
ConfigCommand.alterConfigWithZk(null, del512, CredentialChange("userB", Set(), 4096))
}
- @Test
- def testQuotaConfigEntity(): Unit = {
+ def doTestQuotaConfigEntity(zkConfig: Boolean): Unit = {
+ val connectOpts = if (zkConfig)
+ ("--zookeeper", zkConnect)
+ else
+ ("--bootstrap-server", "localhost:9092")
def createOpts(entityType: String, entityName: Option[String], otherArgs: Array[String]) : ConfigCommandOptions = {
- val optArray = Array("--zookeeper", zkConnect,
- "--entity-type", entityType)
+ val optArray = Array(connectOpts._1, connectOpts._2, "--entity-type", entityType)
val nameArray = entityName match {
case Some(name) => Array("--entity-name", name)
case None => Array[String]()
@@ -1008,9 +1102,23 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
}
@Test
- def testUserClientQuotaOpts(): Unit = {
+ def testQuotaConfigEntityUsingZookeeper(): Unit = {
+ doTestQuotaConfigEntity(zkConfig = true)
+ }
+
+ @Test
+ def testQuotaConfigEntity(): Unit = {
+ doTestQuotaConfigEntity(zkConfig = false)
+ }
+
+ def doTestUserClientQuotaOpts(zkConfig: Boolean): Unit = {
+ val connectOpts = if (zkConfig)
+ ("--zookeeper", zkConnect)
+ else
+ ("--bootstrap-server", "localhost:9092")
+
def checkEntity(expectedEntityType: String, expectedEntityName: String, args: String*): Unit = {
- val opts = new ConfigCommandOptions(Array("--zookeeper", zkConnect) ++ args)
+ val opts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2) ++ args)
opts.checkArgs()
val entity = ConfigCommand.parseEntity(opts)
assertEquals(expectedEntityType, entity.root.entityType)
@@ -1025,7 +1133,6 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--entity-type", "clients", "--entity-name", "<default>",
"--alter", "--add-config", "a=b,c=d")
-
checkEntity("users", Sanitizer.sanitize("CN=user1") + "/clients/client1",
"--entity-type", "users", "--entity-name", "CN=user1", "--entity-type", "clients", "--entity-name", "client1",
"--alter", "--add-config", "a=b,c=d")
@@ -1050,6 +1157,16 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
}
@Test
+ def testUserClientQuotaOptsUsingZookeeper(): Unit = {
+ doTestUserClientQuotaOpts(zkConfig = true)
+ }
+
+ @Test
+ def testUserClientQuotaOpts(): Unit = {
+ doTestUserClientQuotaOpts(zkConfig = false)
+ }
+
+ @Test
def testQuotaDescribeEntities(): Unit = {
val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
@@ -1134,5 +1251,10 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
options: AlterConfigsOptions): AlterConfigsResult = EasyMock.createNiceMock(classOf[AlterConfigsResult])
override def alterConfigs(configs: util.Map[ConfigResource, Config], options: AlterConfigsOptions): AlterConfigsResult =
EasyMock.createNiceMock(classOf[AlterConfigsResult])
+ override def describeClientQuotas(filter: ClientQuotaFilter, options: DescribeClientQuotasOptions): DescribeClientQuotasResult =
+ EasyMock.createNiceMock(classOf[DescribeClientQuotasResult])
+ override def alterClientQuotas(entries: util.Collection[ClientQuotaAlteration],
+ options: AlterClientQuotasOptions): AlterClientQuotasResult =
+ EasyMock.createNiceMock(classOf[AlterClientQuotasResult])
}
}
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
new file mode 100644
index 0000000..047ff71
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -0,0 +1,443 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException}
+import org.apache.kafka.common.internals.KafkaFutureImpl
+import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
+import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse}
+import org.junit.Assert._
+import org.junit.Test
+
+import java.util.concurrent.{ExecutionException, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+class ClientQuotasRequestTest extends BaseRequestTest {
+ private val ConsumerByteRateProp = DynamicConfig.Client.ConsumerByteRateOverrideProp
+ private val ProducerByteRateProp = DynamicConfig.Client.ProducerByteRateOverrideProp
+ private val RequestPercentageProp = DynamicConfig.Client.RequestPercentageOverrideProp
+
+ override val brokerCount = 1
+
+ @Test
+ def testAlterClientQuotasRequest(): Unit = {
+ val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user"), (ClientQuotaEntity.CLIENT_ID -> "client-id")).asJava)
+
+ // Expect an empty configuration.
+ verifyDescribeEntityQuotas(entity, Map.empty)
+
+ // Add two configuration entries.
+ alterEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> Some(10000.0)),
+ (ConsumerByteRateProp -> Some(20000.0))
+ ), validateOnly = false)
+
+ verifyDescribeEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> 10000.0),
+ (ConsumerByteRateProp -> 20000.0)
+ ))
+
+ // Update an existing entry.
+ alterEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> Some(15000.0))
+ ), validateOnly = false)
+
+ verifyDescribeEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> 15000.0),
+ (ConsumerByteRateProp -> 20000.0)
+ ))
+
+ // Remove an existing configuration entry.
+ alterEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> None)
+ ), validateOnly = false)
+
+ verifyDescribeEntityQuotas(entity, Map(
+ (ConsumerByteRateProp -> 20000.0)
+ ))
+
+ // Remove a non-existent configuration entry. This should make no changes.
+ alterEntityQuotas(entity, Map(
+ (RequestPercentageProp -> None)
+ ), validateOnly = false)
+
+ verifyDescribeEntityQuotas(entity, Map(
+ (ConsumerByteRateProp -> 20000.0)
+ ))
+
+ // Add back a deleted configuration entry.
+ alterEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> Some(5000.0))
+ ), validateOnly = false)
+
+ verifyDescribeEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> 5000.0),
+ (ConsumerByteRateProp -> 20000.0)
+ ))
+
+ // Perform a mixed update.
+ alterEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> Some(20000.0)),
+ (ConsumerByteRateProp -> None),
+ (RequestPercentageProp -> Some(12.3))
+ ), validateOnly = false)
+
+ verifyDescribeEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> 20000.0),
+ (RequestPercentageProp -> 12.3)
+ ))
+ }
+
+ @Test
+ def testAlterClientQuotasRequestValidateOnly(): Unit = {
+ val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user")).asJava)
+
+ // Set up a configuration.
+ alterEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> Some(20000.0)),
+ (RequestPercentageProp -> Some(23.45))
+ ), validateOnly = false)
+
+ verifyDescribeEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> 20000.0),
+ (RequestPercentageProp -> 23.45)
+ ))
+
+ // Validate-only addition.
+ alterEntityQuotas(entity, Map(
+ (ConsumerByteRateProp -> Some(50000.0))
+ ), validateOnly = true)
+
+ verifyDescribeEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> 20000.0),
+ (RequestPercentageProp -> 23.45)
+ ))
+
+ // Validate-only modification.
+ alterEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> Some(10000.0))
+ ), validateOnly = true)
+
+ verifyDescribeEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> 20000.0),
+ (RequestPercentageProp -> 23.45)
+ ))
+
+ // Validate-only removal.
+ alterEntityQuotas(entity, Map(
+ (RequestPercentageProp -> None)
+ ), validateOnly = true)
+
+ verifyDescribeEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> 20000.0),
+ (RequestPercentageProp -> 23.45)
+ ))
+
+ // Validate-only mixed update.
+ alterEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> Some(10000.0)),
+ (ConsumerByteRateProp -> Some(50000.0)),
+ (RequestPercentageProp -> None)
+ ), validateOnly = true)
+
+ verifyDescribeEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> 20000.0),
+ (RequestPercentageProp -> 23.45)
+ ))
+ }
+
+ @Test(expected = classOf[InvalidRequestException])
+ def testAlterClientQuotasBadUser(): Unit = {
+ val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "")).asJava)
+ alterEntityQuotas(entity, Map((RequestPercentageProp -> Some(12.34))), validateOnly = true)
+ }
+
+ @Test(expected = classOf[InvalidRequestException])
+ def testAlterClientQuotasBadClientId(): Unit = {
+ val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.CLIENT_ID -> "")).asJava)
+ alterEntityQuotas(entity, Map((RequestPercentageProp -> Some(12.34))), validateOnly = true)
+ }
+
+ @Test(expected = classOf[InvalidRequestException])
+ def testAlterClientQuotasBadEntityType(): Unit = {
+ val entity = new ClientQuotaEntity(Map(("" -> "name")).asJava)
+ alterEntityQuotas(entity, Map((RequestPercentageProp -> Some(12.34))), validateOnly = true)
+ }
+
+ @Test(expected = classOf[InvalidRequestException])
+ def testAlterClientQuotasEmptyEntity(): Unit = {
+ val entity = new ClientQuotaEntity(Map.empty.asJava)
+ alterEntityQuotas(entity, Map((ProducerByteRateProp -> Some(10000.5))), validateOnly = true)
+ }
+
+ @Test(expected = classOf[InvalidRequestException])
+ def testAlterClientQuotasBadConfigKey(): Unit = {
+ val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user")).asJava)
+ alterEntityQuotas(entity, Map(("bad" -> Some(1.0))), validateOnly = true)
+ }
+
+ @Test(expected = classOf[InvalidRequestException])
+ def testAlterClientQuotasBadConfigValue(): Unit = {
+ val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user")).asJava)
+ alterEntityQuotas(entity, Map((ProducerByteRateProp -> Some(10000.5))), validateOnly = true)
+ }
+
+ // Entities to be matched against.
+ private val matchEntities = List(
+ (Some("user-1"), Some("client-id-1"), 50.50),
+ (Some("user-2"), Some("client-id-1"), 51.51),
+ (Some("user-3"), Some("client-id-2"), 52.52),
+ (Some(null), Some("client-id-1"), 53.53),
+ (Some("user-1"), Some(null), 54.54),
+ (Some("user-3"), Some(null), 55.55),
+ (Some("user-1"), None, 56.56),
+ (Some("user-2"), None, 57.57),
+ (Some("user-3"), None, 58.58),
+ (Some(null), None, 59.59),
+ (None, Some("client-id-2"), 60.60)
+ ).map { case (u, c, v) => (toEntity(u, c), v) }
+
+ private def setupDescribeClientQuotasMatchTest() = {
+ val result = alterClientQuotas(matchEntities.map { case (e, v) =>
+ (e -> Map((RequestPercentageProp, Some(v))))
+ }.toMap, validateOnly = false)
+ matchEntities.foreach(e => result.get(e._1).get.get(10, TimeUnit.SECONDS))
+
+ // Allow time for watch callbacks to be triggered.
+ Thread.sleep(500)
+ }
+
+ @Test
+ def testDescribeClientQuotasMatchExact(): Unit = {
+ setupDescribeClientQuotasMatchTest()
+
+ def matchEntity(entity: ClientQuotaEntity) = {
+ val components = entity.entries.asScala.map { case (entityType, entityName) =>
+ entityName match {
+ case null => ClientQuotaFilterComponent.ofDefaultEntity(entityType)
+ case name => ClientQuotaFilterComponent.ofEntity(entityType, name)
+ }
+ }
+ describeClientQuotas(ClientQuotaFilter.containsOnly(components.toList.asJava))
+ }
+
+ // Test exact matches.
+ matchEntities.foreach { case (e, v) =>
+ val result = matchEntity(e)
+ assertEquals(1, result.size)
+ assertTrue(result.get(e) != null)
+ val value = result.get(e).get(RequestPercentageProp)
+ assertTrue(value != null)
+ assertEquals(value, v, 1e-6)
+ }
+
+ // Entities not contained in `matchEntityList`.
+ val notMatchEntities = List(
+ (Some("user-1"), Some("client-id-2")),
+ (Some("user-3"), Some("client-id-1")),
+ (Some("user-2"), Some(null)),
+ (Some("user-4"), None),
+ (Some(null), Some("client-id-2")),
+ (None, Some("client-id-1")),
+ (None, Some("client-id-3")),
+ ).map { case (u, c) =>
+ new ClientQuotaEntity((u.map((ClientQuotaEntity.USER, _)) ++
+ c.map((ClientQuotaEntity.CLIENT_ID, _))).toMap.asJava)
+ }
+
+ // Verify exact matches of the non-matches returns empty.
+ notMatchEntities.foreach { e =>
+ val result = matchEntity(e)
+ assertEquals(0, result.size)
+ }
+ }
+
+ @Test
+ def testDescribeClientQuotasMatchPartial(): Unit = {
+ setupDescribeClientQuotasMatchTest()
+
+ def testMatchEntities(filter: ClientQuotaFilter, expectedMatchSize: Int, partition: ClientQuotaEntity => Boolean) {
+ val result = describeClientQuotas(filter)
+ val (expectedMatches, expectedNonMatches) = matchEntities.partition(e => partition(e._1))
+ assertEquals(expectedMatchSize, expectedMatches.size) // for test verification
+ assertEquals(expectedMatchSize, result.size)
+ val expectedMatchesMap = expectedMatches.toMap
+ matchEntities.foreach { case (entity, expectedValue) =>
+ if (expectedMatchesMap.contains(entity)) {
+ val config = result.get(entity)
+ assertTrue(config != null)
+ val value = config.get(RequestPercentageProp)
+ assertTrue(value != null)
+ assertEquals(expectedValue, value, 1e-6)
+ } else {
+ assertTrue(result.get(entity) == null)
+ }
+ }
+ }
+
+ // Match open-ended existing user.
+ testMatchEntities(
+ ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "user-1")).asJava), 3,
+ entity => entity.entries.get(ClientQuotaEntity.USER) == "user-1"
+ )
+
+ // Match open-ended non-existent user.
+ testMatchEntities(
+ ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "unknown")).asJava), 0,
+ entity => false
+ )
+
+ // Match open-ended existing client ID.
+ testMatchEntities(
+ ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.CLIENT_ID, "client-id-2")).asJava), 2,
+ entity => entity.entries.get(ClientQuotaEntity.CLIENT_ID) == "client-id-2"
+ )
+
+ // Match open-ended default user.
+ testMatchEntities(
+ ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.USER)).asJava), 2,
+ entity => entity.entries.containsKey(ClientQuotaEntity.USER) && entity.entries.get(ClientQuotaEntity.USER) == null
+ )
+
+ // Match close-ended existing user.
+ testMatchEntities(
+ ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "user-2")).asJava), 1,
+ entity => entity.entries.get(ClientQuotaEntity.USER) == "user-2" && !entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID)
+ )
+
+ // Match close-ended existing client ID that has no matching entity.
+ testMatchEntities(
+ ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.CLIENT_ID, "client-id-1")).asJava), 0,
+ entity => false
+ )
+
+ // Match against all entities with the user type in a close-ended match.
+ testMatchEntities(
+ ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)).asJava), 4,
+ entity => entity.entries.containsKey(ClientQuotaEntity.USER) && !entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID)
+ )
+
+ // Match against all entities with the user type in an open-ended match.
+ testMatchEntities(
+ ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)).asJava), 10,
+ entity => entity.entries.containsKey(ClientQuotaEntity.USER)
+ )
+
+ // Match against all entities with the client ID type in a close-ended match.
+ testMatchEntities(
+ ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)).asJava), 1,
+ entity => entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID) && !entity.entries.containsKey(ClientQuotaEntity.USER)
+ )
+
+ // Match against all entities with the client ID type in an open-ended match.
+ testMatchEntities(
+ ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)).asJava), 7,
+ entity => entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID)
+ )
+
+ // Match open-ended empty filter list. This should match all entities.
+ testMatchEntities(ClientQuotaFilter.contains(List.empty.asJava), 11, entity => true)
+
+ // Match close-ended empty filter list. This should match no entities.
+ testMatchEntities(ClientQuotaFilter.containsOnly(List.empty.asJava), 0, entity => false)
+ }
+
+ @Test
+ def testClientQuotasUnsupportedEntityTypes() {
+ val entity = new ClientQuotaEntity(Map(("other" -> "name")).asJava)
+ try {
+ verifyDescribeEntityQuotas(entity, Map())
+ } catch {
+ case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[UnsupportedVersionException])
+ }
+ }
+
+ @Test
+ def testClientQuotasSanitized(): Unit = {
+ // An entity with name that must be sanitized when writing to Zookeeper.
+ val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user with spaces")).asJava)
+
+ alterEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> Some(20000.0)),
+ ), validateOnly = false)
+
+ verifyDescribeEntityQuotas(entity, Map(
+ (ProducerByteRateProp -> 20000.0),
+ ))
+ }
+
+ private def verifyDescribeEntityQuotas(entity: ClientQuotaEntity, quotas: Map[String, Double]) = {
+ val components = entity.entries.asScala.map(e => ClientQuotaFilterComponent.ofEntity(e._1, e._2))
+ val describe = describeClientQuotas(ClientQuotaFilter.containsOnly(components.toList.asJava))
+ if (quotas.isEmpty) {
+ assertEquals(0, describe.size)
+ } else {
+ assertEquals(1, describe.size)
+ val configs = describe.get(entity)
+ assertTrue(configs != null)
+ assertEquals(quotas.size, configs.size)
+ quotas.foreach { case (k, v) =>
+ val value = configs.get(k)
+ assertTrue(value != null)
+ assertEquals(v, value, 1e-6)
+ }
+ }
+ }
+
+ private def toEntity(user: Option[String], clientId: Option[String]) =
+ new ClientQuotaEntity((user.map((ClientQuotaEntity.USER -> _)) ++ clientId.map((ClientQuotaEntity.CLIENT_ID -> _))).toMap.asJava)
+
+ private def describeClientQuotas(filter: ClientQuotaFilter) = {
+ val result = new KafkaFutureImpl[java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]]]
+ sendDescribeClientQuotasRequest(filter).complete(result)
+ result.get
+ }
+
+ private def sendDescribeClientQuotasRequest(filter: ClientQuotaFilter): DescribeClientQuotasResponse = {
+ val request = new DescribeClientQuotasRequest.Builder(filter).build()
+ connectAndReceive[DescribeClientQuotasResponse](request, destination = controllerSocketServer)
+ }
+
+ private def alterEntityQuotas(entity: ClientQuotaEntity, alter: Map[String, Option[Double]], validateOnly: Boolean) =
+ try alterClientQuotas(Map(entity -> alter), validateOnly).get(entity).get.get(10, TimeUnit.SECONDS) catch {
+ case e: ExecutionException => throw e.getCause
+ }
+
+ private def alterClientQuotas(request: Map[ClientQuotaEntity, Map[String, Option[Double]]], validateOnly: Boolean) = {
+ val entries = request.map { case (entity, alter) =>
+ val ops = alter.map { case (key, value) =>
+ new ClientQuotaAlteration.Op(key, value.map(Double.box).getOrElse(null))
+ }.asJavaCollection
+ new ClientQuotaAlteration(entity, ops)
+ }
+
+ val response = request.map(e => (e._1 -> new KafkaFutureImpl[Void])).asJava
+ sendAlterClientQuotasRequest(entries, validateOnly).complete(response)
+ val result = response.asScala
+ assertEquals(request.size, result.size)
+ request.foreach(e => assertTrue(result.get(e._1).isDefined))
+ result
+ }
+
+ private def sendAlterClientQuotasRequest(entries: Iterable[ClientQuotaAlteration], validateOnly: Boolean): AlterClientQuotasResponse = {
+ val request = new AlterClientQuotasRequest.Builder(entries.asJavaCollection, validateOnly).build()
+ connectAndReceive[AlterClientQuotasResponse](request, destination = controllerSocketServer)
+ }
+
+}