You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/10/11 21:33:07 UTC
kafka git commit: MINOR: Push JMX metric name mangling into the
JmxReporter (KIP-190 follow up)
Repository: kafka
Updated Branches:
refs/heads/trunk 54ed3435b -> 7d6ca52a2
MINOR: Push JMX metric name mangling into the JmxReporter (KIP-190 follow up)
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Rajini Sivaram <ra...@googlemail.com>, Ismael Juma <is...@juma.me.uk>
Closes #3980 from ewencp/dont-mangle-names
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7d6ca52a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7d6ca52a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7d6ca52a
Branch: refs/heads/trunk
Commit: 7d6ca52a2751908c7fc6b752d70dfaaaaa9bbe8c
Parents: 54ed343
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Wed Oct 11 17:32:40 2017 -0400
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Wed Oct 11 17:32:40 2017 -0400
----------------------------------------------------------------------
.../kafka/clients/admin/KafkaAdminClient.java | 14 ++-
.../kafka/clients/consumer/KafkaConsumer.java | 8 +-
.../kafka/clients/producer/KafkaProducer.java | 8 +-
.../kafka/common/metrics/JmxReporter.java | 3 +-
.../apache/kafka/common/metrics/Sanitizer.java | 61 ------------
.../kafka/common/utils/AppInfoParser.java | 3 +-
.../apache/kafka/common/utils/Sanitizer.java | 61 ++++++++++++
.../kafka/common/metrics/JmxReporterTest.java | 67 ++++++++++++-
.../kafka/common/metrics/SanitizerTest.java | 35 -------
.../kafka/common/utils/SanitizerTest.java | 35 +++++++
.../kafka/connect/runtime/ConnectMetrics.java | 45 +--------
.../connect/runtime/ConnectMetricsTest.java | 24 -----
.../main/scala/kafka/admin/ConfigCommand.scala | 5 +-
.../scala/kafka/network/RequestChannel.scala | 3 +-
.../scala/kafka/server/ClientQuotaManager.scala | 98 ++++++++++----------
.../server/ClientRequestQuotaManager.scala | 4 +-
.../main/scala/kafka/server/ConfigHandler.scala | 10 +-
.../kafka/server/DynamicConfigManager.scala | 2 +-
.../integration/kafka/api/BaseQuotaTest.scala | 8 +-
.../kafka/api/ClientIdQuotaTest.scala | 6 +-
.../kafka/api/UserClientIdQuotaTest.scala | 6 +-
.../integration/kafka/api/UserQuotaTest.scala | 6 +-
.../unit/kafka/admin/ConfigCommandTest.scala | 4 +-
.../kafka/server/ClientQuotaManagerTest.scala | 70 +++++++-------
.../kafka/server/DynamicConfigChangeTest.scala | 1 +
.../unit/kafka/server/RequestQuotaTest.scala | 3 +-
26 files changed, 299 insertions(+), 291 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 1a66371..ece27ca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -55,7 +55,6 @@ import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.metrics.Sanitizer;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
@@ -290,7 +289,6 @@ public class KafkaAdminClient extends AdminClient {
NetworkClient networkClient = null;
Time time = Time.SYSTEM;
String clientId = generateClientId(config);
- String sanitizedClientId = Sanitizer.sanitize(clientId);
ChannelBuilder channelBuilder = null;
Selector selector = null;
ApiVersions apiVersions = new ApiVersions();
@@ -303,7 +301,7 @@ public class KafkaAdminClient extends AdminClient {
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), true);
List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
- Map<String, String> metricTags = Collections.singletonMap("client-id", sanitizedClientId);
+ Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG)))
@@ -328,7 +326,7 @@ public class KafkaAdminClient extends AdminClient {
true,
apiVersions,
logContext);
- return new KafkaAdminClient(config, clientId, sanitizedClientId, time, metadata, metrics, networkClient,
+ return new KafkaAdminClient(config, clientId, time, metadata, metrics, networkClient,
timeoutProcessorFactory, logContext);
} catch (Throwable exc) {
closeQuietly(metrics, "Metrics");
@@ -345,7 +343,7 @@ public class KafkaAdminClient extends AdminClient {
try {
metrics = new Metrics(new MetricConfig(), new LinkedList<MetricsReporter>(), time);
- return new KafkaAdminClient(config, clientId, Sanitizer.sanitize(clientId), time, metadata, metrics, client, null,
+ return new KafkaAdminClient(config, clientId, time, metadata, metrics, client, null,
createLogContext(clientId));
} catch (Throwable exc) {
closeQuietly(metrics, "Metrics");
@@ -357,7 +355,7 @@ public class KafkaAdminClient extends AdminClient {
return new LogContext("[AdminClient clientId=" + clientId + "] ");
}
- private KafkaAdminClient(AdminClientConfig config, String clientId, String sanitizedClientId, Time time, Metadata metadata,
+ private KafkaAdminClient(AdminClientConfig config, String clientId, Time time, Metadata metadata,
Metrics metrics, KafkaClient client, TimeoutProcessorFactory timeoutProcessorFactory,
LogContext logContext) {
this.defaultTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
@@ -377,7 +375,7 @@ public class KafkaAdminClient extends AdminClient {
new TimeoutProcessorFactory() : timeoutProcessorFactory;
this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);
config.logUnused();
- AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId, metrics);
+ AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Kafka admin client initialized");
thread.start();
}
@@ -418,7 +416,7 @@ public class KafkaAdminClient extends AdminClient {
// Wait for the thread to be joined.
thread.join();
- AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId), metrics);
+ AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Kafka admin client closed.");
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 6fb6919..9547aee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -42,7 +42,6 @@ import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.metrics.Sanitizer;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
@@ -647,7 +646,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
if (clientId.isEmpty())
clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
- String sanitizedClientId = Sanitizer.sanitize(this.clientId);
String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] ");
@@ -661,7 +659,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
this.time = Time.SYSTEM;
- Map<String, String> metricsTags = Collections.singletonMap("client-id", sanitizedClientId);
+ Map<String, String> metricsTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
@@ -772,7 +770,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
isolationLevel);
config.logUnused();
- AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId, metrics);
+ AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Kafka consumer initialized");
} catch (Throwable t) {
@@ -1739,7 +1737,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
ClientUtils.closeQuietly(client, "consumer network client", firstException);
ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException);
ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException);
- AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId), metrics);
+ AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Kafka consumer has been closed");
Throwable exception = firstException.get();
if (exception != null && !swallowException) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a202217..b6c0a53 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -51,7 +51,6 @@ import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.metrics.Sanitizer;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
@@ -317,7 +316,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
if (clientId.length() <= 0)
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
- String sanitizedClientId = Sanitizer.sanitize(clientId);
String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
(String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
@@ -329,7 +327,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
log = logContext.logger(KafkaProducer.class);
log.trace("Starting the Kafka producer");
- Map<String, String> metricTags = Collections.singletonMap("client-id", sanitizedClientId);
+ Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
@@ -427,7 +425,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.ioThread.start();
this.errors = this.metrics.sensor("errors");
config.logUnused();
- AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId, metrics);
+ AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
@@ -1075,7 +1073,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException);
ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
ClientUtils.closeQuietly(partitioner, "producer partitioner", firstException);
- AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId), metrics);
+ AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Kafka producer has been closed");
if (firstException.get() != null && !swallowException)
throw new KafkaException("Failed to close kafka producer", firstException.get());
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
index 294e1d8..fda37d1 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -37,6 +37,7 @@ import javax.management.ReflectionException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.utils.Sanitizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,7 +134,7 @@ public class JmxReporter implements MetricsReporter {
mBeanName.append(",");
mBeanName.append(entry.getKey());
mBeanName.append("=");
- mBeanName.append(entry.getValue());
+ mBeanName.append(Sanitizer.sanitize(entry.getValue()));
}
return mBeanName.toString();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java
deleted file mode 100644
index b98a426..0000000
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.metrics;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
-
-import org.apache.kafka.common.KafkaException;
-
-/**
- * Utility class for sanitizing/desanitizing user principal and client-ids
- * to a safe value for use in MetricName and as Zookeeper node name
- */
-public class Sanitizer {
-
- public static String sanitize(String name) {
- String encoded = "";
- try {
- encoded = URLEncoder.encode(name, StandardCharsets.UTF_8.name());
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < encoded.length(); i++) {
- char c = encoded.charAt(i);
- if (c == '*') { // Metric ObjectName treats * as pattern
- builder.append("%2A");
- } else if (c == '+') { // Space URL-encoded as +, replace with percent encoding
- builder.append("%20");
- } else {
- builder.append(c);
- }
- }
- return builder.toString();
- } catch (UnsupportedEncodingException e) {
- throw new KafkaException(e);
- }
- }
-
- public static String desanitize(String name) {
- try {
- return URLDecoder.decode(name, StandardCharsets.UTF_8.name());
- } catch (UnsupportedEncodingException e) {
- throw new KafkaException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
index 9a1bab8..42cf312 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.Sanitizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,7 +70,7 @@ public class AppInfoParser {
public static synchronized void unregisterAppInfo(String prefix, String id, Metrics metrics) {
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
try {
- ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + id);
+ ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + Sanitizer.sanitize(id));
if (server.isRegistered(name))
server.unregisterMBean(name);
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
new file mode 100644
index 0000000..0b68d0c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * Utility class for sanitizing/desanitizing user principal and client-ids
+ * to a safe value for use in JMX metric names and as Zookeeper node name
+ */
+public class Sanitizer {
+
+ public static String sanitize(String name) {
+ String encoded = "";
+ try {
+ encoded = URLEncoder.encode(name, StandardCharsets.UTF_8.name());
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < encoded.length(); i++) {
+ char c = encoded.charAt(i);
+ if (c == '*') { // Metric ObjectName treats * as pattern
+ builder.append("%2A");
+ } else if (c == '+') { // Space URL-encoded as +, replace with percent encoding
+ builder.append("%20");
+ } else {
+ builder.append(c);
+ }
+ }
+ return builder.toString();
+ } catch (UnsupportedEncodingException e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ public static String desanitize(String name) {
+ try {
+ return URLDecoder.decode(name, StandardCharsets.UTF_8.name());
+ } catch (UnsupportedEncodingException e) {
+ throw new KafkaException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
index 3f09e08..3b39db6 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
@@ -20,19 +20,80 @@ import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Total;
import org.junit.Test;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
public class JmxReporterTest {
@Test
public void testJmxRegistration() throws Exception {
Metrics metrics = new Metrics();
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
try {
metrics.addReporter(new JmxReporter());
+
+ assertFalse(server.isRegistered(new ObjectName(":type=grp1")));
+
Sensor sensor = metrics.sensor("kafka.requests");
sensor.add(metrics.metricName("pack.bean1.avg", "grp1"), new Avg());
sensor.add(metrics.metricName("pack.bean2.total", "grp2"), new Total());
- Sensor sensor2 = metrics.sensor("kafka.blah");
- sensor2.add(metrics.metricName("pack.bean1.some", "grp1"), new Total());
- sensor2.add(metrics.metricName("pack.bean2.some", "grp1"), new Total());
+
+ assertTrue(server.isRegistered(new ObjectName(":type=grp1")));
+ assertEquals(0.0, server.getAttribute(new ObjectName(":type=grp1"), "pack.bean1.avg"));
+ assertTrue(server.isRegistered(new ObjectName(":type=grp2")));
+ assertEquals(0.0, server.getAttribute(new ObjectName(":type=grp2"), "pack.bean2.total"));
+
+ metrics.removeMetric(metrics.metricName("pack.bean1.avg", "grp1"));
+
+ assertFalse(server.isRegistered(new ObjectName(":type=grp1")));
+ assertTrue(server.isRegistered(new ObjectName(":type=grp2")));
+ assertEquals(0.0, server.getAttribute(new ObjectName(":type=grp2"), "pack.bean2.total"));
+
+ metrics.removeMetric(metrics.metricName("pack.bean2.total", "grp2"));
+
+ assertFalse(server.isRegistered(new ObjectName(":type=grp1")));
+ assertFalse(server.isRegistered(new ObjectName(":type=grp2")));
+ } finally {
+ metrics.close();
+ }
+ }
+
+ @Test
+ public void testJmxRegistrationSanitization() throws Exception {
+ Metrics metrics = new Metrics();
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+ try {
+ metrics.addReporter(new JmxReporter());
+
+ Sensor sensor = metrics.sensor("kafka.requests");
+ sensor.add(metrics.metricName("name", "group", "desc", "id", "foo*"), new Total());
+ sensor.add(metrics.metricName("name", "group", "desc", "id", "foo+"), new Total());
+ sensor.add(metrics.metricName("name", "group", "desc", "id", "foo?"), new Total());
+ sensor.add(metrics.metricName("name", "group", "desc", "id", "foo:"), new Total());
+
+ assertTrue(server.isRegistered(new ObjectName(":type=group,id=foo%2A")));
+ assertEquals(0.0, server.getAttribute(new ObjectName(":type=group,id=foo%2A"), "name"));
+ assertTrue(server.isRegistered(new ObjectName(":type=group,id=foo%2B")));
+ assertEquals(0.0, server.getAttribute(new ObjectName(":type=group,id=foo%2B"), "name"));
+ assertTrue(server.isRegistered(new ObjectName(":type=group,id=foo%3F")));
+ assertEquals(0.0, server.getAttribute(new ObjectName(":type=group,id=foo%3F"), "name"));
+ assertTrue(server.isRegistered(new ObjectName(":type=group,id=foo%3A")));
+ assertEquals(0.0, server.getAttribute(new ObjectName(":type=group,id=foo%3A"), "name"));
+
+ metrics.removeMetric(metrics.metricName("name", "group", "desc", "id", "foo*"));
+ metrics.removeMetric(metrics.metricName("name", "group", "desc", "id", "foo+"));
+ metrics.removeMetric(metrics.metricName("name", "group", "desc", "id", "foo?"));
+ metrics.removeMetric(metrics.metricName("name", "group", "desc", "id", "foo:"));
+
+ assertFalse(server.isRegistered(new ObjectName(":type=group,id=foo%2A")));
+ assertFalse(server.isRegistered(new ObjectName(":type=group,id=foo%2B")));
+ assertFalse(server.isRegistered(new ObjectName(":type=group,id=foo%3F")));
+ assertFalse(server.isRegistered(new ObjectName(":type=group,id=foo%3A")));
} finally {
metrics.close();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java
deleted file mode 100644
index d66bda1..0000000
--- a/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.metrics;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.UnsupportedEncodingException;
-
-import org.junit.Test;
-
-public class SanitizerTest {
-
- @Test
- public void testSanitize() throws UnsupportedEncodingException {
- String principal = "CN=Some characters !@#$%&*()_-+=';:,/~";
- String sanitizedPrincipal = Sanitizer.sanitize(principal);
- assertTrue(sanitizedPrincipal.replace('%', '_').matches("[a-zA-Z0-9\\._\\-]+"));
- assertEquals(principal, Sanitizer.desanitize(sanitizedPrincipal));
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java b/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java
new file mode 100644
index 0000000..dd384ee
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.UnsupportedEncodingException;
+
+import org.junit.Test;
+
+public class SanitizerTest {
+
+ @Test
+ public void testSanitize() throws UnsupportedEncodingException {
+ String principal = "CN=Some characters !@#$%&*()_-+=';:,/~";
+ String sanitizedPrincipal = Sanitizer.sanitize(principal);
+ assertTrue(sanitizedPrincipal.replace('%', '_').matches("[a-zA-Z0-9\\._\\-]+"));
+ assertEquals(principal, Sanitizer.desanitize(sanitizedPrincipal));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index 3cd1eae..5bbe148 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -64,7 +64,7 @@ public class ConnectMetrics {
* @param time the time; may not be null
*/
public ConnectMetrics(String workerId, WorkerConfig config, Time time) {
- this.workerId = makeValidName(workerId);
+ this.workerId = workerId;
this.time = time;
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
@@ -111,8 +111,7 @@ public class ConnectMetrics {
* Get or create a {@link MetricGroup} with the specified group name and the given tags.
* Each group is uniquely identified by the name and tags.
*
- * @param groupName the name of the metric group; may not be null and must be a
- * {@link #checkNameIsValid(String) valid name}
+ * @param groupName the name of the metric group; may not be null
* @param tagKeyValues pairs of tag name and values
* @return the {@link MetricGroup} that can be used to create metrics; never null
* @throws IllegalArgumentException if the group name is not valid
@@ -130,7 +129,6 @@ public class ConnectMetrics {
}
protected MetricGroupId groupId(String groupName, String... tagKeyValues) {
- checkNameIsValid(groupName);
Map<String, String> tags = tags(tagKeyValues);
return new MetricGroupId(groupName, tags);
}
@@ -262,7 +260,6 @@ public class ConnectMetrics {
* @throws IllegalArgumentException if the name is not valid
*/
public MetricName metricName(MetricNameTemplate template) {
- checkNameIsValid(template.name());
return metrics.metricInstance(template, groupId.tags());
}
@@ -428,8 +425,7 @@ public class ConnectMetrics {
}
/**
- * Create a set of tags using the supplied key and value pairs. Every tag name and value will be
- * {@link #makeValidName(String) made valid} before it is used. The order of the tags will be kept.
+ * Create a set of tags using the supplied key and value pairs. The order of the tags will be kept.
*
* @param keyValue the key and value pairs for the tags; must be an even number
* @return the map of tags that can be supplied to the {@link Metrics} methods; never null
@@ -439,49 +435,18 @@ public class ConnectMetrics {
throw new IllegalArgumentException("keyValue needs to be specified in pairs");
Map<String, String> tags = new LinkedHashMap<>();
for (int i = 0; i < keyValue.length; i += 2) {
- tags.put(makeValidName(keyValue[i]), makeValidName(keyValue[i + 1]));
+ tags.put(keyValue[i], keyValue[i + 1]);
}
return tags;
}
/**
- * Utility to ensure the supplied name contains valid characters, replacing with a single '-' sequences of
- * 1 or more characters <em>other than</em> word characters (e.g., "[a-zA-Z_0-9]").
- *
- * @param name the name; may not be null
- * @return the validated name; never null
- */
- static String makeValidName(String name) {
- Objects.requireNonNull(name);
- name = name.trim();
- if (!name.isEmpty()) {
- name = name.replaceAll("[^\\w]+", "-");
- }
- return name;
- }
-
- /**
- * Utility method that determines whether the supplied name contains only "[a-zA-Z0-9_-]" characters and thus
- * would be unchanged by {@link #makeValidName(String)}.
- *
- * @param name the name; may not be null
- * @return true if the name is valid, or false otherwise
- * @throws IllegalArgumentException if the name is not valid
- */
- static void checkNameIsValid(String name) {
- if (!name.equals(makeValidName(name))) {
- throw new IllegalArgumentException("The name '" + name + "' contains at least one invalid character");
- }
- }
-
- /**
* Utility to generate the documentation for the Connect metrics.
*
* @param args the arguments
*/
public static void main(String[] args) {
ConnectMetricsRegistry metrics = new ConnectMetricsRegistry();
- System.out.println(Metrics.toHtmlTable("kafka.connect", metrics.getAllTemplates()));
+ System.out.println(Metrics.toHtmlTable(JMX_PREFIX, metrics.getAllTemplates()));
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index a16ab41..2de7cb6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -57,30 +57,6 @@ public class ConnectMetricsTest {
}
@Test
- public void testValidatingNameWithAllValidCharacters() {
- String name = "abcdefghijklmnopqrstuvwxyz_ABCDEFGHIJKLMNOPQRSTUVWXYZ-0123456789";
- assertEquals(name, ConnectMetrics.makeValidName(name));
- }
-
- @Test
- public void testValidatingEmptyName() {
- String name = "";
- assertSame(name, ConnectMetrics.makeValidName(name));
- }
-
- @Test(expected = NullPointerException.class)
- public void testValidatingNullName() {
- ConnectMetrics.makeValidName(null);
- }
-
- @Test
- public void testValidatingNameWithInvalidCharacters() {
- assertEquals("a-b-c-d-e-f-g-h-i-j-k", ConnectMetrics.makeValidName("a:b;c/d\\e,f*.--..;;g?h[i]j=k"));
- assertEquals("-a-b-c-d-e-f-g-h-", ConnectMetrics.makeValidName(":a:b;c/d\\e,f*g?[]=h:"));
- assertEquals("a-f-h", ConnectMetrics.makeValidName("a:;/\\,f*?h"));
- }
-
- @Test
public void testKafkaMetricsNotNull() {
assertNotNull(metrics.metrics());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 306d64a..febf40f 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -18,6 +18,7 @@
package kafka.admin
import java.util.Properties
+
import joptsimple._
import kafka.common.Config
import kafka.common.InvalidConfigException
@@ -27,8 +28,8 @@ import kafka.utils.{CommandLineUtils, ZkUtils}
import kafka.utils.Implicits._
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram._
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.metrics.Sanitizer
+import org.apache.kafka.common.utils.{Sanitizer, Utils}
+
import scala.collection._
import scala.collection.JavaConverters._
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index ec16ab0..a4ec5e3 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -26,12 +26,11 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.network.RequestChannel.{BaseRequest, SendAction, ShutdownRequest, NoOpAction, CloseConnectionAction}
import kafka.utils.{Logging, NotNothing}
import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.metrics.Sanitizer
import org.apache.kafka.common.network.Send
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{Sanitizer, Time}
import org.apache.log4j.Logger
import scala.collection.mutable
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index afaa5dd..5d0b966 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -19,11 +19,11 @@ package kafka.server
import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock
-import kafka.utils.{ShutdownableThread, Logging}
+import kafka.utils.{Logging, ShutdownableThread}
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.metrics.stats.{Total, Rate, Avg}
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.metrics.stats.{Avg, Rate, Total}
+import org.apache.kafka.common.utils.{Sanitizer, Time}
import scala.collection.JavaConverters._
@@ -61,9 +61,9 @@ object ClientQuotaManagerConfig {
val NanosToPercentagePerSecond = 100.0 / TimeUnit.SECONDS.toNanos(1)
val UnlimitedQuota = Quota.upperBound(Long.MaxValue)
- val DefaultClientIdQuotaId = QuotaId(None, Some(ConfigEntityName.Default))
- val DefaultUserQuotaId = QuotaId(Some(ConfigEntityName.Default), None)
- val DefaultUserClientIdQuotaId = QuotaId(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
+ val DefaultClientIdQuotaId = QuotaId(None, Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
+ val DefaultUserQuotaId = QuotaId(Some(ConfigEntityName.Default), None, None)
+ val DefaultUserClientIdQuotaId = QuotaId(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
}
object QuotaTypes {
@@ -73,9 +73,9 @@ object QuotaTypes {
val UserClientIdQuotaEnabled = 4
}
-case class QuotaId(sanitizedUser: Option[String], sanitizedClientId: Option[String])
+case class QuotaId(sanitizedUser: Option[String], clientId: Option[String], sanitizedClientId: Option[String])
-case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, sanitizedClientId: String, quota: Quota)
+case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, clientId: String, sanitizedClientId: String, quota: Quota)
/**
* Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics
@@ -187,7 +187,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
case _: QuotaViolationException =>
// Compute the delay
val clientQuotaEntity = clientSensors.quotaEntity
- val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.sanitizedClientId))
+ val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)).toInt
clientSensors.throttleTimeSensor.record(throttleTimeMs)
// If delayed, add the element to the delayQueue
@@ -213,33 +213,33 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* and the associated quota override or default quota.
*
*/
- private def quotaEntity(sanitizedUser: String, sanitizedClientId: String) : QuotaEntity = {
+ private def quotaEntity(sanitizedUser: String, clientId: String, sanitizedClientId: String) : QuotaEntity = {
quotaTypesEnabled match {
case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled =>
- val quotaId = QuotaId(None, Some(sanitizedClientId))
+ val quotaId = QuotaId(None, Some(clientId), Some(sanitizedClientId))
var quota = overriddenQuota.get(quotaId)
if (quota == null) {
quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultClientIdQuotaId)
if (quota == null)
quota = staticConfigClientIdQuota
}
- QuotaEntity(quotaId, "", sanitizedClientId, quota)
+ QuotaEntity(quotaId, "", clientId, sanitizedClientId, quota)
case QuotaTypes.UserQuotaEnabled =>
- val quotaId = QuotaId(Some(sanitizedUser), None)
+ val quotaId = QuotaId(Some(sanitizedUser), None, None)
var quota = overriddenQuota.get(quotaId)
if (quota == null) {
quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultUserQuotaId)
if (quota == null)
quota = ClientQuotaManagerConfig.UnlimitedQuota
}
- QuotaEntity(quotaId, sanitizedUser, "", quota)
+ QuotaEntity(quotaId, sanitizedUser, "", "", quota)
case QuotaTypes.UserClientIdQuotaEnabled =>
- val quotaId = QuotaId(Some(sanitizedUser), Some(sanitizedClientId))
+ val quotaId = QuotaId(Some(sanitizedUser), Some(clientId), Some(sanitizedClientId))
var quota = overriddenQuota.get(quotaId)
if (quota == null) {
- quota = overriddenQuota.get(QuotaId(Some(sanitizedUser), Some(ConfigEntityName.Default)))
+ quota = overriddenQuota.get(QuotaId(Some(sanitizedUser), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default)))
if (quota == null) {
- quota = overriddenQuota.get(QuotaId(Some(ConfigEntityName.Default), Some(sanitizedClientId)))
+ quota = overriddenQuota.get(QuotaId(Some(ConfigEntityName.Default), Some(clientId), Some(sanitizedClientId)))
if (quota == null) {
quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultUserClientIdQuotaId)
if (quota == null)
@@ -247,17 +247,17 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
}
}
- QuotaEntity(quotaId, sanitizedUser, sanitizedClientId, quota)
+ QuotaEntity(quotaId, sanitizedUser, clientId, sanitizedClientId, quota)
case _ =>
- quotaEntityWithMultipleQuotaLevels(sanitizedUser, sanitizedClientId)
+ quotaEntityWithMultipleQuotaLevels(sanitizedUser, clientId, sanitizedClientId)
}
}
- private def quotaEntityWithMultipleQuotaLevels(sanitizedUser: String, sanitizerClientId: String) : QuotaEntity = {
- val userClientQuotaId = QuotaId(Some(sanitizedUser), Some(sanitizerClientId))
+ private def quotaEntityWithMultipleQuotaLevels(sanitizedUser: String, clientId: String, sanitizerClientId: String) : QuotaEntity = {
+ val userClientQuotaId = QuotaId(Some(sanitizedUser), Some(clientId), Some(sanitizerClientId))
- val userQuotaId = QuotaId(Some(sanitizedUser), None)
- val clientQuotaId = QuotaId(None, Some(sanitizerClientId))
+ val userQuotaId = QuotaId(Some(sanitizedUser), None, None)
+ val clientQuotaId = QuotaId(None, Some(clientId), Some(sanitizerClientId))
var quotaId = userClientQuotaId
var quotaConfigId = userClientQuotaId
// 1) /config/users/<user>/clients/<client-id>
@@ -265,7 +265,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
if (quota == null) {
// 2) /config/users/<user>/clients/<default>
quotaId = userClientQuotaId
- quotaConfigId = QuotaId(Some(sanitizedUser), Some(ConfigEntityName.Default))
+ quotaConfigId = QuotaId(Some(sanitizedUser), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
quota = overriddenQuota.get(quotaConfigId)
if (quota == null) {
@@ -277,31 +277,31 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
if (quota == null) {
// 4) /config/users/<default>/clients/<client-id>
quotaId = userClientQuotaId
- quotaConfigId = QuotaId(Some(ConfigEntityName.Default), Some(sanitizerClientId))
+ quotaConfigId = QuotaId(Some(ConfigEntityName.Default), Some(clientId), Some(sanitizerClientId))
quota = overriddenQuota.get(quotaConfigId)
if (quota == null) {
// 5) /config/users/<default>/clients/<default>
quotaId = userClientQuotaId
- quotaConfigId = QuotaId(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
+ quotaConfigId = QuotaId(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
quota = overriddenQuota.get(quotaConfigId)
if (quota == null) {
// 6) /config/users/<default>
quotaId = userQuotaId
- quotaConfigId = QuotaId(Some(ConfigEntityName.Default), None)
+ quotaConfigId = QuotaId(Some(ConfigEntityName.Default), None, None)
quota = overriddenQuota.get(quotaConfigId)
if (quota == null) {
// 7) /config/clients/<client-id>
quotaId = clientQuotaId
- quotaConfigId = QuotaId(None, Some(sanitizerClientId))
+ quotaConfigId = QuotaId(None, Some(clientId), Some(sanitizerClientId))
quota = overriddenQuota.get(quotaConfigId)
if (quota == null) {
// 8) /config/clients/<default>
quotaId = clientQuotaId
- quotaConfigId = QuotaId(None, Some(ConfigEntityName.Default))
+ quotaConfigId = QuotaId(None, Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
quota = overriddenQuota.get(quotaConfigId)
if (quota == null) {
@@ -317,8 +317,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
}
val quotaUser = if (quotaId == clientQuotaId) "" else sanitizedUser
- val quotaClientId = if (quotaId == userQuotaId) "" else sanitizerClientId
- QuotaEntity(quotaId, quotaUser, quotaClientId, quota)
+ val quotaClientId = if (quotaId == userQuotaId) "" else clientId
+ val quotaSanitizedClientId = if (quotaId == userQuotaId) "" else sanitizerClientId
+ QuotaEntity(quotaId, quotaUser, quotaClientId, sanitizerClientId, quota)
}
/**
@@ -327,7 +328,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* Note: this method is expensive, it is meant to be used by tests only
*/
def quota(user: String, clientId: String) = {
- quotaEntity(Sanitizer.sanitize(user), Sanitizer.sanitize(clientId)).quota
+ quotaEntity(Sanitizer.sanitize(user), clientId, Sanitizer.sanitize(clientId)).quota
}
/*
@@ -361,14 +362,14 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
*/
def getOrCreateQuotaSensors(sanitizedUser: String, clientId: String): ClientSensors = {
val sanitizedClientId = Sanitizer.sanitize(clientId)
- val clientQuotaEntity = quotaEntity(sanitizedUser, sanitizedClientId)
+ val clientQuotaEntity = quotaEntity(sanitizedUser, clientId, sanitizedClientId)
// Names of the sensors to access
ClientSensors(
clientQuotaEntity,
sensorAccessor.getOrCreate(
getQuotaSensorName(clientQuotaEntity.quotaId),
ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
- clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.sanitizedClientId),
+ clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId),
Some(getQuotaMetricConfig(clientQuotaEntity.quota)),
new Rate
),
@@ -381,9 +382,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
)
}
- private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.sanitizedClientId.getOrElse("")
+ private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
- private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.sanitizedClientId.getOrElse("")
+ private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
protected def getQuotaMetricConfig(quota: Quota): MetricConfig = {
new MetricConfig()
@@ -406,10 +407,11 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* Overrides quotas for <user>, <client-id> or <user, client-id> or the dynamic defaults
* for any of these levels.
* @param sanitizedUser user to override if quota applies to <user> or <user, client-id>
- * @param sanitizedClientId client to override if quota applies to <client-id> or <user, client-id>
+ * @param clientId client to override if quota applies to <client-id> or <user, client-id>
+ * @param sanitizedClientId sanitized client ID to override if quota applies to <client-id> or <user, client-id>
* @param quota custom quota to apply or None if quota override is being removed
*/
- def updateQuota(sanitizedUser: Option[String], sanitizedClientId: Option[String], quota: Option[Quota]) {
+ def updateQuota(sanitizedUser: Option[String], clientId: Option[String], sanitizedClientId: Option[String], quota: Option[Quota]) {
/*
* Acquire the write lock to apply changes in the quota objects.
* This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object (if it exists).
@@ -419,13 +421,13 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
*/
lock.writeLock().lock()
try {
- val quotaId = QuotaId(sanitizedUser, sanitizedClientId)
+ val quotaId = QuotaId(sanitizedUser, clientId, sanitizedClientId)
val userInfo = sanitizedUser match {
case Some(ConfigEntityName.Default) => "default user "
case Some(user) => "user " + user + " "
case None => ""
}
- val clientIdInfo = sanitizedClientId match {
+ val clientIdInfo = clientId match {
case Some(ConfigEntityName.Default) => "default client-id"
case Some(id) => "client-id " + id
case None => ""
@@ -434,7 +436,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
case Some(newQuota) =>
logger.info(s"Changing ${quotaType} quota for ${userInfo}${clientIdInfo} to $newQuota.bound}")
overriddenQuota.put(quotaId, newQuota)
- (sanitizedUser, sanitizedClientId) match {
+ (sanitizedUser, clientId) match {
case (Some(_), Some(_)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
case (Some(_), None) => quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
case (None, Some(_)) => quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
@@ -445,21 +447,21 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
overriddenQuota.remove(quotaId)
}
- val quotaMetricName = clientRateMetricName(sanitizedUser.getOrElse(""), sanitizedClientId.getOrElse(""))
+ val quotaMetricName = clientRateMetricName(sanitizedUser.getOrElse(""), clientId.getOrElse(""))
val allMetrics = metrics.metrics()
// If multiple-levels of quotas are defined or if this is a default quota update, traverse metrics
// to find all affected values. Otherwise, update just the single matching one.
val singleUpdate = quotaTypesEnabled match {
case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled | QuotaTypes.UserClientIdQuotaEnabled =>
- !sanitizedUser.filter(_ == ConfigEntityName.Default).isDefined && !sanitizedClientId.filter(_ == ConfigEntityName.Default).isDefined
+ !sanitizedUser.filter(_ == ConfigEntityName.Default).isDefined && !clientId.filter(_ == ConfigEntityName.Default).isDefined
case _ => false
}
if (singleUpdate) {
// Change the underlying metric config if the sensor has been created
val metric = allMetrics.get(quotaMetricName)
if (metric != null) {
- val metricConfigEntity = quotaEntity(sanitizedUser.getOrElse(""), sanitizedClientId.getOrElse(""))
+ val metricConfigEntity = quotaEntity(sanitizedUser.getOrElse(""), clientId.getOrElse(""), sanitizedClientId.getOrElse(""))
val newQuota = metricConfigEntity.quota
logger.info(s"Sensor for ${userInfo}${clientIdInfo} already exists. Changing quota to ${newQuota.bound()} in MetricConfig")
metric.config(getQuotaMetricConfig(newQuota))
@@ -469,7 +471,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
case (metricName, metric) =>
val userTag = if (metricName.tags.containsKey("user")) metricName.tags.get("user") else ""
val clientIdTag = if (metricName.tags.containsKey("client-id")) metricName.tags.get("client-id") else ""
- val metricConfigEntity = quotaEntity(userTag, clientIdTag)
+ val metricConfigEntity = quotaEntity(userTag, clientIdTag, Sanitizer.sanitize(clientIdTag))
if (metricConfigEntity.quota != metric.config.quota) {
val newQuota = metricConfigEntity.quota
logger.info(s"Sensor for quota-id ${metricConfigEntity.quotaId} already exists. Setting quota to ${newQuota.bound} in MetricConfig")
@@ -483,11 +485,11 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
}
- protected def clientRateMetricName(sanitizedUser: String, sanitizedClientId: String): MetricName = {
+ protected def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
metrics.metricName("byte-rate", quotaType.toString,
"Tracking byte-rate per user/client-id",
"user", sanitizedUser,
- "client-id", sanitizedClientId)
+ "client-id", clientId)
}
private def throttleMetricName(quotaEntity: QuotaEntity): MetricName = {
@@ -495,7 +497,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
quotaType.toString,
"Tracking average throttle-time per user/client-id",
"user", quotaEntity.sanitizedUser,
- "client-id", quotaEntity.sanitizedClientId)
+ "client-id", quotaEntity.clientId)
}
def shutdown() = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
index d2114dc..f454483 100644
--- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -64,11 +64,11 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
math.min(super.throttleTime(clientMetric, config), maxThrottleTimeMs)
}
- override protected def clientRateMetricName(sanitizedUser: String, sanitizedClientId: String): MetricName = {
+ override protected def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = {
metrics.metricName("request-time", QuotaType.Request.toString,
"Tracking request-time per user/client-id",
"user", sanitizedUser,
- "client-id", sanitizedClientId)
+ "client-id", clientId)
}
private def exemptMetricName: MetricName = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 6f85801..ddeecb0 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -29,8 +29,9 @@ import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.common.config.ConfigDef.Validator
import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.metrics.{Quota, Sanitizer}
+import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.metrics.Quota._
+import org.apache.kafka.common.utils.Sanitizer
import scala.collection.JavaConverters._
@@ -118,24 +119,25 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC
class QuotaConfigHandler(private val quotaManagers: QuotaManagers) {
def updateQuotaConfig(sanitizedUser: Option[String], sanitizedClientId: Option[String], config: Properties) {
+ val clientId = sanitizedClientId.map(Sanitizer.desanitize)
val producerQuota =
if (config.containsKey(DynamicConfig.Client.ProducerByteRateOverrideProp))
Some(new Quota(config.getProperty(DynamicConfig.Client.ProducerByteRateOverrideProp).toLong, true))
else
None
- quotaManagers.produce.updateQuota(sanitizedUser, sanitizedClientId, producerQuota)
+ quotaManagers.produce.updateQuota(sanitizedUser, clientId, sanitizedClientId, producerQuota)
val consumerQuota =
if (config.containsKey(DynamicConfig.Client.ConsumerByteRateOverrideProp))
Some(new Quota(config.getProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp).toLong, true))
else
None
- quotaManagers.fetch.updateQuota(sanitizedUser, sanitizedClientId, consumerQuota)
+ quotaManagers.fetch.updateQuota(sanitizedUser, clientId, sanitizedClientId, consumerQuota)
val requestQuota =
if (config.containsKey(DynamicConfig.Client.RequestPercentageOverrideProp))
Some(new Quota(config.getProperty(DynamicConfig.Client.RequestPercentageOverrideProp).toDouble, true))
else
None
- quotaManagers.request.updateQuota(sanitizedUser, sanitizedClientId, requestQuota)
+ quotaManagers.request.updateQuota(sanitizedUser, clientId, sanitizedClientId, requestQuota)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index 634b0c2..69f9e96 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -28,7 +28,7 @@ import kafka.admin.AdminUtils
import kafka.utils.json.JsonObject
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.security.scram.ScramMechanism
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{Sanitizer, Time}
/**
* Represents all the entities that can be configured via ZK
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index e8967d1..d821f52 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer._
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.common.{MetricName, TopicPartition}
-import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sanitizer}
+import org.apache.kafka.common.metrics.{KafkaMetric, Quota}
import org.junit.Assert._
import org.junit.{Before, Test}
@@ -210,7 +210,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
private def verifyProducerThrottleTimeMetric(producer: KafkaProducer[_, _]) {
val tags = new HashMap[String, String]
- tags.put("client-id", Sanitizer.sanitize(producerClientId))
+ tags.put("client-id", producerClientId)
val avgMetric = producer.metrics.get(new MetricName("produce-throttle-time-avg", "producer-metrics", "", tags))
val maxMetric = producer.metrics.get(new MetricName("produce-throttle-time-max", "producer-metrics", "", tags))
@@ -220,7 +220,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
private def verifyConsumerThrottleTimeMetric(consumer: KafkaConsumer[_, _], maxThrottleTime: Option[Double] = None) {
val tags = new HashMap[String, String]
- tags.put("client-id", Sanitizer.sanitize(consumerClientId))
+ tags.put("client-id", consumerClientId)
val avgMetric = consumer.metrics.get(new MetricName("fetch-throttle-time-avg", "consumer-fetch-manager-metrics", "", tags))
val maxMetric = consumer.metrics.get(new MetricName("fetch-throttle-time-max", "consumer-fetch-manager-metrics", "", tags))
@@ -234,7 +234,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
quotaType.toString,
"Tracking throttle-time per user/client-id",
"user", quotaId.sanitizedUser.getOrElse(""),
- "client-id", quotaId.sanitizedClientId.getOrElse(""))
+ "client-id", quotaId.clientId.getOrElse(""))
}
def throttleMetric(quotaType: QuotaType, quotaId: QuotaId): KafkaMetric = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index f5a2cf5..383f139 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -18,8 +18,8 @@ import java.util.Properties
import kafka.admin.AdminUtils
import kafka.server.{DynamicConfig, KafkaConfig, QuotaId}
-import org.apache.kafka.common.metrics.Sanitizer
import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.utils.Sanitizer
import org.junit.Before
class ClientIdQuotaTest extends BaseQuotaTest {
@@ -27,8 +27,8 @@ class ClientIdQuotaTest extends BaseQuotaTest {
override val userPrincipal = KafkaPrincipal.ANONYMOUS.getName
override def producerClientId = "QuotasTestProducer-!@#$%^&*()"
override def consumerClientId = "QuotasTestConsumer-!@#$%^&*()"
- override val producerQuotaId = QuotaId(None, Some(Sanitizer.sanitize(producerClientId)))
- override val consumerQuotaId = QuotaId(None, Some(Sanitizer.sanitize(consumerClientId)))
+ override val producerQuotaId = QuotaId(None, Some(producerClientId), Some(Sanitizer.sanitize(producerClientId)))
+ override val consumerQuotaId = QuotaId(None, Some(consumerClientId), Some(Sanitizer.sanitize(consumerClientId)))
@Before
override def setUp() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index cb6d376..e25f886 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -20,8 +20,8 @@ import java.util.Properties
import kafka.admin.AdminUtils
import kafka.server._
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.Sanitizer
import org.junit.Before
-import org.apache.kafka.common.metrics.Sanitizer
class UserClientIdQuotaTest extends BaseQuotaTest {
@@ -31,8 +31,8 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
override val userPrincipal = "O=A client,CN=localhost"
override def producerClientId = "QuotasTestProducer-!@#$%^&*()"
override def consumerClientId = "QuotasTestConsumer-!@#$%^&*()"
- override def producerQuotaId = QuotaId(Some(Sanitizer.sanitize(userPrincipal)), Some(Sanitizer.sanitize(producerClientId)))
- override def consumerQuotaId = QuotaId(Some(Sanitizer.sanitize(userPrincipal)), Some(Sanitizer.sanitize(consumerClientId)))
+ override def producerQuotaId = QuotaId(Some(Sanitizer.sanitize(userPrincipal)), Some(producerClientId), Some(Sanitizer.sanitize(producerClientId)))
+ override def consumerQuotaId = QuotaId(Some(Sanitizer.sanitize(userPrincipal)), Some(consumerClientId), Some(Sanitizer.sanitize(consumerClientId)))
@Before
override def setUp() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
index a7bddc5..b5d88c0 100644
--- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -21,8 +21,8 @@ import kafka.admin.AdminUtils
import kafka.server.{ConfigEntityName, KafkaConfig, QuotaId}
import kafka.utils.JaasTestUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.Sanitizer
import org.junit.{After, Before}
-import org.apache.kafka.common.metrics.Sanitizer
class UserQuotaTest extends BaseQuotaTest with SaslSetup {
@@ -34,8 +34,8 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
override val userPrincipal = JaasTestUtils.KafkaClientPrincipalUnqualifiedName2
- override val producerQuotaId = QuotaId(Some(userPrincipal), None)
- override val consumerQuotaId = QuotaId(Some(userPrincipal), None)
+ override val producerQuotaId = QuotaId(Some(userPrincipal), None, None)
+ override val consumerQuotaId = QuotaId(Some(userPrincipal), None, None)
@Before
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index c0aff93..87ce46e 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -23,14 +23,14 @@ import kafka.common.InvalidConfigException
import kafka.server.ConfigEntityName
import kafka.utils.{Logging, ZkUtils}
import kafka.zk.ZooKeeperTestHarness
-
import org.apache.kafka.common.security.scram.ScramCredentialUtils
+import org.apache.kafka.common.utils.Sanitizer
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.Test
+
import scala.collection.mutable
import scala.collection.JavaConverters._
-import org.apache.kafka.common.metrics.Sanitizer
class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 54be960..4196bc1 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -18,8 +18,8 @@ package kafka.server
import java.util.Collections
-import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota, Sanitizer}
-import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota}
+import org.apache.kafka.common.utils.{MockTime, Sanitizer}
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.{Before, Test}
@@ -43,8 +43,8 @@ class ClientQuotaManagerTest {
try {
// Case 1: Update the quota. Assert that the new quota value is returned
- clientMetrics.updateQuota(client1.configUser, client1.configClientId, Some(new Quota(2000, true)))
- clientMetrics.updateQuota(client2.configUser, client2.configClientId, Some(new Quota(4000, true)))
+ clientMetrics.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(2000, true)))
+ clientMetrics.updateQuota(client2.configUser, client2.configClientId, client2.sanitizedConfigClientId, Some(new Quota(4000, true)))
assertEquals("Default producer quota should be " + config.quotaBytesPerSecondDefault, new Quota(config.quotaBytesPerSecondDefault, true), clientMetrics.quota(randomClient.user, randomClient.clientId))
assertEquals("Should return the overridden value (2000)", new Quota(2000, true), clientMetrics.quota(client1.user, client1.clientId))
@@ -56,22 +56,22 @@ class ClientQuotaManagerTest {
// Case 2: Change quota again. The quota should be updated within KafkaMetrics as well since the sensor was created.
// p1 should not longer be throttled after the quota change
- clientMetrics.updateQuota(client1.configUser, client1.configClientId, Some(new Quota(3000, true)))
+ clientMetrics.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(3000, true)))
assertEquals("Should return the newly overridden value (3000)", new Quota(3000, true), clientMetrics.quota(client1.user, client1.clientId))
throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId, 0, this.callback)
assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs)
// Case 3: Change quota back to default. Should be throttled again
- clientMetrics.updateQuota(client1.configUser, client1.configClientId, Some(new Quota(500, true)))
+ clientMetrics.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(500, true)))
assertEquals("Should return the default value (500)", new Quota(500, true), clientMetrics.quota(client1.user, client1.clientId))
throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId, 0, this.callback)
assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0)
// Case 4: Set high default quota, remove p1 quota. p1 should no longer be throttled
- clientMetrics.updateQuota(client1.configUser, client1.configClientId, None)
- clientMetrics.updateQuota(defaultConfigClient.configUser, defaultConfigClient.configClientId, Some(new Quota(4000, true)))
+ clientMetrics.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, None)
+ clientMetrics.updateQuota(defaultConfigClient.configUser, defaultConfigClient.configClientId, defaultConfigClient.sanitizedConfigClientId, Some(new Quota(4000, true)))
assertEquals("Should return the newly overridden value (4000)", new Quota(4000, true), clientMetrics.quota(client1.user, client1.clientId))
throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId, 1000 * config.numQuotaSamples, this.callback)
@@ -161,16 +161,16 @@ class ClientQuotaManagerTest {
}
try {
- quotaManager.updateQuota(Some(ConfigEntityName.Default), None, Some(new Quota(1000, true)))
- quotaManager.updateQuota(None, Some(ConfigEntityName.Default), Some(new Quota(2000, true)))
- quotaManager.updateQuota(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(new Quota(3000, true)))
- quotaManager.updateQuota(Some("userA"), None, Some(new Quota(4000, true)))
- quotaManager.updateQuota(Some("userA"), Some("client1"), Some(new Quota(5000, true)))
- quotaManager.updateQuota(Some("userB"), None, Some(new Quota(6000, true)))
- quotaManager.updateQuota(Some("userB"), Some("client1"), Some(new Quota(7000, true)))
- quotaManager.updateQuota(Some("userB"), Some(ConfigEntityName.Default), Some(new Quota(8000, true)))
- quotaManager.updateQuota(Some("userC"), None, Some(new Quota(10000, true)))
- quotaManager.updateQuota(None, Some("client1"), Some(new Quota(9000, true)))
+ quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, Some(new Quota(1000, true)))
+ quotaManager.updateQuota(None, Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(new Quota(2000, true)))
+ quotaManager.updateQuota(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(new Quota(3000, true)))
+ quotaManager.updateQuota(Some("userA"), None, None, Some(new Quota(4000, true)))
+ quotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), Some(new Quota(5000, true)))
+ quotaManager.updateQuota(Some("userB"), None, None, Some(new Quota(6000, true)))
+ quotaManager.updateQuota(Some("userB"), Some("client1"), Some("client1"), Some(new Quota(7000, true)))
+ quotaManager.updateQuota(Some("userB"), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(new Quota(8000, true)))
+ quotaManager.updateQuota(Some("userC"), None, None, Some(new Quota(10000, true)))
+ quotaManager.updateQuota(None, Some("client1"), Some("client1"), Some(new Quota(9000, true)))
checkQuota("userA", "client1", 5000, 4500, false) // <user, client> quota takes precedence over <user>
checkQuota("userA", "client2", 4000, 4500, true) // <user> quota takes precedence over <client> and defaults
@@ -186,32 +186,32 @@ class ClientQuotaManagerTest {
checkQuota("userE", "client1", 3000, 2500, false)
// Remove default <user, client> quota config, revert to <user> default
- quotaManager.updateQuota(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), None)
+ quotaManager.updateQuota(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), None)
checkQuota("userD", "client1", 1000, 0, false) // Metrics tags changed, restart counter
checkQuota("userE", "client4", 1000, 1500, true)
checkQuota("userF", "client4", 1000, 800, false) // Default <user> quota shared across clients of user
checkQuota("userF", "client5", 1000, 800, true)
// Remove default <user> quota config, revert to <client-id> default
- quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None)
+ quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, None)
checkQuota("userF", "client4", 2000, 0, false) // Default <client-id> quota shared across client-id of all users
checkQuota("userF", "client5", 2000, 0, false)
checkQuota("userF", "client5", 2000, 2500, true)
checkQuota("userG", "client5", 2000, 0, true)
// Update quotas
- quotaManager.updateQuota(Some("userA"), None, Some(new Quota(8000, true)))
- quotaManager.updateQuota(Some("userA"), Some("client1"), Some(new Quota(10000, true)))
+ quotaManager.updateQuota(Some("userA"), None, None, Some(new Quota(8000, true)))
+ quotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), Some(new Quota(10000, true)))
checkQuota("userA", "client2", 8000, 0, false)
checkQuota("userA", "client2", 8000, 4500, true) // Throttled due to sum of new and earlier values
checkQuota("userA", "client1", 10000, 0, false)
checkQuota("userA", "client1", 10000, 6000, true)
- quotaManager.updateQuota(Some("userA"), Some("client1"), None)
+ quotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), None)
checkQuota("userA", "client6", 8000, 0, true) // Throttled due to shared user quota
- quotaManager.updateQuota(Some("userA"), Some("client6"), Some(new Quota(11000, true)))
+ quotaManager.updateQuota(Some("userA"), Some("client6"), Some("client6"), Some(new Quota(11000, true)))
checkQuota("userA", "client6", 11000, 8500, false)
- quotaManager.updateQuota(Some("userA"), Some(ConfigEntityName.Default), Some(new Quota(12000, true)))
- quotaManager.updateQuota(Some("userA"), Some("client6"), None)
+ quotaManager.updateQuota(Some("userA"), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(new Quota(12000, true)))
+ quotaManager.updateQuota(Some("userA"), Some("client6"), Some("client6"), None)
checkQuota("userA", "client6", 12000, 4000, true) // Throttled due to sum of new and earlier values
} finally {
@@ -271,7 +271,7 @@ class ClientQuotaManagerTest {
def testRequestPercentageQuotaViolation() {
val metrics = newMetrics
val quotaManager = new ClientRequestQuotaManager(config, metrics, time)
- quotaManager.updateQuota(Some("ANONYMOUS"), Some("test-client"), Some(Quota.upperBound(1)))
+ quotaManager.updateQuota(Some("ANONYMOUS"), Some("test-client"), Some("test-client"), Some(Quota.upperBound(1)))
val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Request", ""))
def millisToPercent(millis: Double) = millis * 1000 * 1000 * ClientQuotaManagerConfig.NanosToPercentagePerSecond
try {
@@ -373,17 +373,18 @@ class ClientQuotaManagerTest {
}
@Test
- def testSanitizeClientId() {
+ def testClientIdNotSanitized() {
val metrics = newMetrics
val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time)
val clientId = "client@#$%"
try {
clientMetrics.maybeRecordAndThrottle("ANONYMOUS", clientId, 100, callback)
-
- val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:" + Sanitizer.sanitize(clientId))
+
+ // The metrics should use the raw client ID, even if the reporters internally sanitize them
+ val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:" + clientId)
assertTrue("Throttle time sensor should exist", throttleTimeSensor != null)
- val byteRateSensor = metrics.getSensor("Produce-:" + Sanitizer.sanitize(clientId))
+ val byteRateSensor = metrics.getSensor("Produce-:" + clientId)
assertTrue("Byte rate sensor should exist", byteRateSensor != null)
} finally {
clientMetrics.shutdown()
@@ -394,5 +395,10 @@ class ClientQuotaManagerTest {
new Metrics(new MetricConfig(), Collections.emptyList(), time)
}
- private case class UserClient(val user: String, val clientId: String, val configUser: Option[String] = None, val configClientId: Option[String] = None)
+ private case class UserClient(val user: String, val clientId: String, val configUser: Option[String] = None, val configClientId: Option[String] = None) {
+ // The class under test expects only sanitized client configs. We pass both the default value (which should not be
+ // sanitized to ensure it remains unique) and non-default values, so we need to take care in generating the sanitized
+ // client ID
+ def sanitizedConfigClientId = configClientId.map(x => if (x == ConfigEntityName.Default) ConfigEntityName.Default else Sanitizer.sanitize(x))
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 9d2bb8b..2e0b454 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -28,6 +28,7 @@ import kafka.integration.KafkaServerTestHarness
import kafka.utils._
import kafka.admin.{AdminOperationException, AdminUtils}
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Sanitizer
import scala.collection.Map
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 480dfa6..4774e1d 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol}
+import org.apache.kafka.common.utils.Sanitizer
import org.junit.Assert._
import org.junit.{After, Before, Test}
@@ -82,7 +83,7 @@ class RequestQuotaTest extends BaseRequestTest {
quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "0.01")
AdminUtils.changeClientIdConfig(zkUtils, "<default>", quotaProps)
quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "2000")
- AdminUtils.changeClientIdConfig(zkUtils, unthrottledClientId, quotaProps)
+ AdminUtils.changeClientIdConfig(zkUtils, Sanitizer.sanitize(unthrottledClientId), quotaProps)
TestUtils.retry(10000) {
val quotaManager = servers.head.apis.quotas.request