You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/05/28 01:19:15 UTC
[kafka] branch trunk updated: KAFKA-9960: implement KIP-606 to add
metadata context to MetricsReporter (#8691)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9c833f6 KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter (#8691)
9c833f6 is described below
commit 9c833f665f349e5c292228f75188f5521282835d
Author: xiaodongdu <xi...@hotmail.com>
AuthorDate: Wed May 27 18:18:36 2020 -0700
KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter (#8691)
Implemented KIP-606 to add metadata context to MetricsReporter.
Author: Xiaodong Du <xd...@confluent.io>
Reviewers: David Arthur <mu...@gmail.com>, Randall Hauch <rh...@gmail.com>, Xavier Léauté <xa...@confluent.io>, Ryan Pridgeon <ry...@gmail.com>
---
checkstyle/suppressions.xml | 4 +-
.../apache/kafka/clients/CommonClientConfigs.java | 2 +
.../kafka/clients/admin/KafkaAdminClient.java | 9 +-
.../kafka/clients/consumer/KafkaConsumer.java | 8 +-
.../kafka/clients/producer/KafkaProducer.java | 9 +-
.../apache/kafka/common/metrics/JmxReporter.java | 19 ++-
.../kafka/common/metrics/KafkaMetricsContext.java | 56 +++++++++
.../org/apache/kafka/common/metrics/Metrics.java | 30 ++++-
.../kafka/common/metrics/MetricsContext.java | 49 ++++++++
.../kafka/common/metrics/MetricsReporter.java | 9 ++
.../kafka/clients/consumer/KafkaConsumerTest.java | 22 ++++
.../kafka/clients/producer/KafkaProducerTest.java | 23 ++++
.../kafka/common/metrics/JmxReporterTest.java | 20 +++
.../connect/mirror/MirrorConnectorConfig.java | 10 +-
.../kafka/connect/runtime/ConnectMetrics.java | 21 +++-
.../org/apache/kafka/connect/runtime/Worker.java | 32 +++--
.../apache/kafka/connect/runtime/WorkerConfig.java | 3 +
.../runtime/distributed/WorkerGroupMember.java | 21 +++-
.../connect/storage/KafkaConfigBackingStore.java | 7 +-
.../connect/storage/KafkaOffsetBackingStore.java | 6 +-
.../connect/storage/KafkaStatusBackingStore.java | 5 +
.../apache/kafka/connect/util/ConnectUtils.java | 14 +++
.../kafka/connect/runtime/ConnectMetricsTest.java | 2 +-
.../kafka/connect/runtime/MockConnectMetrics.java | 14 ++-
.../apache/kafka/connect/runtime/WorkerTest.java | 135 +++++++++++++++++++--
.../runtime/WorkerWithTopicCreationTest.java | 47 +++++--
.../runtime/distributed/WorkerGroupMemberTest.java | 103 ++++++++++++++++
.../errors/RetryWithToleranceOperatorTest.java | 2 +-
.../storage/KafkaConfigBackingStoreTest.java | 9 +-
.../storage/KafkaOffsetBackingStoreTest.java | 14 ++-
.../kafka/connect/util/ConnectUtilsTest.java | 39 ++++++
.../scala/kafka/server/DynamicBrokerConfig.scala | 1 +
core/src/main/scala/kafka/server/KafkaServer.scala | 37 ++++--
.../group/GroupMetadataManagerTest.scala | 6 +-
.../transaction/TransactionStateManagerTest.scala | 6 +-
.../kafka/server/KafkaMetricsReporterTest.scala | 95 +++++++++++++++
.../org/apache/kafka/streams/KafkaStreams.java | 9 +-
.../org/apache/kafka/streams/KafkaStreamsTest.java | 4 +-
.../processor/internals/StreamTaskTest.java | 7 +-
.../processor/internals/StreamThreadTest.java | 7 +-
.../state/internals/MeteredKeyValueStoreTest.java | 7 +-
.../state/internals/MeteredSessionStoreTest.java | 7 +-
.../MeteredTimestampedKeyValueStoreTest.java | 7 +-
.../state/internals/MeteredWindowStoreTest.java | 7 +-
44 files changed, 870 insertions(+), 74 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index d55743a..d13cd88 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -100,10 +100,10 @@
<!-- Connect -->
<suppress checks="ClassFanOutComplexity"
files="(DistributedHerder|Worker).java"/>
-
+ <suppress checks="ClassFanOutComplexity"
+ files="Worker(|Test).java"/>
<suppress checks="MethodLength"
files="(KafkaConfigBackingStore|Values).java"/>
-
<suppress checks="ParameterNumber"
files="Worker(SinkTask|SourceTask|Coordinator).java"/>
<suppress checks="ParameterNumber"
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index d18c0ed..987389a6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -87,6 +87,8 @@ public class CommonClientConfigs {
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>org.apache.kafka.common.metrics.MetricsReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
+ public static final String METRICS_CONTEXT_PREFIX = "metrics.context.";
+
public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " +
Utils.join(SecurityProtocol.names(), ", ") + ".";
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index e0e6cb6..1fe0841 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.StaleMetadataException;
@@ -124,8 +125,10 @@ import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteReque
import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection;
import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
@@ -462,10 +465,12 @@ public class KafkaAdminClient extends AdminClient {
.timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
- JmxReporter jmxReporter = new JmxReporter(JMX_PREFIX);
+ JmxReporter jmxReporter = new JmxReporter();
jmxReporter.configure(config.originals());
reporters.add(jmxReporter);
- metrics = new Metrics(metricConfig, reporters, time);
+ MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
+ config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+ metrics = new Metrics(metricConfig, reporters, time, metricsContext);
String metricGrpPrefix = "admin-client";
channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
selector = new Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 81d286f..7535a9b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -44,8 +44,10 @@ import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
@@ -868,10 +870,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
.tags(metricsTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class, Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
- JmxReporter jmxReporter = new JmxReporter(JMX_PREFIX);
+ JmxReporter jmxReporter = new JmxReporter();
jmxReporter.configure(config.originals());
reporters.add(jmxReporter);
- return new Metrics(metricConfig, reporters, time);
+ MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
+ config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+ return new Metrics(metricConfig, reporters, time, metricsContext);
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index deecfd8..375ada5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
@@ -54,8 +55,10 @@ import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
@@ -351,10 +354,12 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
- JmxReporter jmxReporter = new JmxReporter(JMX_PREFIX);
+ JmxReporter jmxReporter = new JmxReporter();
jmxReporter.configure(userProvidedConfigs);
reporters.add(jmxReporter);
- this.metrics = new Metrics(metricConfig, reporters, time);
+ MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
+ config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+ this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
if (keySerializer == null) {
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
index c7be865..090f93b 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -38,6 +38,7 @@ import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Pattern;
@@ -71,9 +72,13 @@ public class JmxReporter implements MetricsReporter {
/**
* Create a JMX reporter that prefixes all metrics with the given string.
+ * @deprecated Since 2.6.0. Use {@link JmxReporter#JmxReporter()}
+ * Initialize JmxReporter with {@link JmxReporter#contextChange(MetricsContext)}
+ * Populate prefix by adding _namespace/prefix key value pair to {@link MetricsContext}
*/
+ @Deprecated
public JmxReporter(String prefix) {
- this.prefix = prefix;
+ this.prefix = prefix != null ? prefix : "";
}
@Override
@@ -318,4 +323,16 @@ public class JmxReporter implements MetricsReporter {
+ ".(whitelist/blacklist) is not a valid regular expression");
}
}
+
+ @Override
+ public void contextChange(MetricsContext metricsContext) {
+ String namespace = metricsContext.contextLabels().get(MetricsContext.NAMESPACE);
+ Objects.requireNonNull(namespace);
+ synchronized (LOCK) {
+ if (!mbeans.isEmpty()) {
+ throw new IllegalStateException("JMX MetricsContext can only be updated before JMX metrics are created");
+ }
+ prefix = namespace;
+ }
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
new file mode 100644
index 0000000..2fe73de
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A implementation of MetricsContext, it encapsulates required metrics context properties for Kafka services and clients
+ */
+public class KafkaMetricsContext implements MetricsContext {
+ /**
+ * Client or Service's contextLabels map.
+ */
+ private final Map<String, String> contextLabels = new HashMap<>();
+
+ /**
+ * Create a MetricsContext with namespace, no service or client properties
+ * @param namespace value for _namespace key
+ */
+ public KafkaMetricsContext(String namespace) {
+ this(namespace, new HashMap<>());
+ }
+
+ /**
+ * Create a MetricsContext with namespace, service or client properties
+ * @param namespace value for _namespace key
+ * @param contextLabels contextLabels additional entries to add to the context.
+ * values will be converted to string using Object.toString()
+ */
+ public KafkaMetricsContext(String namespace, Map<String, ?> contextLabels) {
+ this.contextLabels.put(MetricsContext.NAMESPACE, namespace);
+ contextLabels.forEach((key, value) -> this.contextLabels.put(key, value.toString()));
+ }
+
+ @Override
+ public Map<String, String> contextLabels() {
+ return Collections.unmodifiableMap(contextLabels);
+ }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index dee970c..fcaa3b6 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -122,6 +122,18 @@ public class Metrics implements Closeable {
}
/**
+ * Create a metrics repository with a default config, metric reporters and metric context
+ * Expiration of Sensors is disabled.
+ * @param defaultConfig The default config
+ * @param reporters The metrics reporters
+ * @param time The time instance to use with the metrics
+ * @param metricsContext The metricsContext to initialize metrics reporter with
+ */
+ public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time, MetricsContext metricsContext) {
+ this(defaultConfig, reporters, time, false, metricsContext);
+ }
+
+ /**
* Create a metrics repository with a default config, given metric reporters and the ability to expire eligible sensors
* @param defaultConfig The default config
* @param reporters The metrics reporters
@@ -129,14 +141,30 @@ public class Metrics implements Closeable {
* @param enableExpiration true if the metrics instance can garbage collect inactive sensors, false otherwise
*/
public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time, boolean enableExpiration) {
+ this(defaultConfig, reporters, time, enableExpiration, new KafkaMetricsContext(""));
+ }
+
+ /**
+ * Create a metrics repository with a default config, given metric reporters, the ability to expire eligible sensors
+ * and MetricContext
+ * @param defaultConfig The default config
+ * @param reporters The metrics reporters
+ * @param time The time instance to use with the metrics
+ * @param enableExpiration true if the metrics instance can garbage collect inactive sensors, false otherwise
+ * @param metricsContext The metricsContext to initialize metrics reporter with
+ */
+ public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time, boolean enableExpiration,
+ MetricsContext metricsContext) {
this.config = defaultConfig;
this.sensors = new ConcurrentHashMap<>();
this.metrics = new ConcurrentHashMap<>();
this.childrenSensors = new ConcurrentHashMap<>();
this.reporters = Objects.requireNonNull(reporters);
this.time = time;
- for (MetricsReporter reporter : reporters)
+ for (MetricsReporter reporter : reporters) {
+ reporter.contextChange(metricsContext);
reporter.init(new ArrayList<>());
+ }
// Create the ThreadPoolExecutor only if expiration of Sensors is enabled.
if (enableExpiration) {
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
new file mode 100644
index 0000000..dcac5c2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional contextLabels about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The contextLabels map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ * e.g. kafka.server, kafka.consumer
+ * {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ * client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {
+ /* predefined fields */
+ String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix
+
+ /**
+ * Returns the labels for this metrics context.
+ *
+ * @return the map of label keys and values; never null but possibly empty
+ */
+ Map<String, String> contextLabels();
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
index cc112d1..75771fb 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.Reconfigurable;
+import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigException;
/**
@@ -65,4 +66,12 @@ public interface MetricsReporter extends Reconfigurable, AutoCloseable {
default void reconfigure(Map<String, ?> configs) {
}
+ /**
+ * Sets the context labels for the service or library exposing metrics. This will be called before {@link #init(List)} and may be called anytime after that.
+ *
+ * @param metricsContext the metric context
+ */
+ @InterfaceStability.Evolving
+ default void contextChange(MetricsContext metricsContext) {
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 56f6975..022c688 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -85,9 +85,14 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.MockConsumerInterceptor;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.common.metrics.stats.Avg;
+
import org.junit.Assert;
import org.junit.Test;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
@@ -383,6 +388,23 @@ public class KafkaConsumerTest {
consumer.close();
}
+ @Test
+ public void testConsumerJmxPrefix() throws Exception {
+ Map<String, Object> config = new HashMap<>();
+ config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ config.put(ConsumerConfig.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
+ config.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
+ config.put("client.id", "client-1");
+ KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(
+ config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+ MetricName testMetricName = consumer.metrics.metricName("test-metric",
+ "grp1", "test metric");
+ consumer.metrics.addMetric(testMetricName, new Avg());
+ Assert.assertNotNull(server.getObjectInstance(new ObjectName("kafka.consumer:type=grp1,client-id=client-1")));
+ consumer.close();
+ }
+
private KafkaConsumer<byte[], byte[]> newConsumer(String groupId) {
return newConsumer(groupId, Optional.empty());
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 280faa1..3f98e4b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -20,12 +20,14 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
@@ -40,6 +42,7 @@ import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@@ -66,6 +69,9 @@ import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -1189,6 +1195,23 @@ public class KafkaProducerTest {
assertionDoneLatch.await(5000, TimeUnit.MILLISECONDS);
}
+ @Test
+ public void testProducerJmxPrefix() throws Exception {
+ Map<String, Object> props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.put("client.id", "client-1");
+
+ KafkaProducer<String, String> producer = new KafkaProducer<>(
+ props, new StringSerializer(), new StringSerializer());
+
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+ MetricName testMetricName = producer.metrics.metricName("test-metric",
+ "grp1", "test metric");
+ producer.metrics.addMetric(testMetricName, new Avg());
+ Assert.assertNotNull(server.getObjectInstance(new ObjectName("kafka.producer:type=grp1,client-id=client-1")));
+ producer.close();
+ }
+
private ProducerMetadata newMetadata(long refreshBackoffMs, long expirationMs) {
return new ProducerMetadata(refreshBackoffMs, expirationMs, defaultMetadataIdleMs,
new LogContext(), new ClusterResourceListeners(), Time.SYSTEM);
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
index ea9597b..5bc831c 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
@@ -19,11 +19,14 @@ package org.apache.kafka.common.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.utils.Time;
import org.junit.Test;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -155,4 +158,21 @@ public class JmxReporterTest {
metrics.close();
}
}
+
+ @Test
+ public void testJmxPrefix() throws Exception {
+ JmxReporter reporter = new JmxReporter();
+ MetricsContext metricsContext = new KafkaMetricsContext("kafka.server");
+ MetricConfig metricConfig = new MetricConfig();
+ Metrics metrics = new Metrics(metricConfig, new ArrayList<>(Arrays.asList(reporter)), Time.SYSTEM, metricsContext);
+
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+ try {
+ Sensor sensor = metrics.sensor("kafka.requests");
+ sensor.add(metrics.metricName("pack.bean1.avg", "grp1"), new Avg());
+ assertEquals("kafka.server", server.getObjectInstance(new ObjectName("kafka.server:type=grp1")).getObjectName().getDomain());
+ } finally {
+ metrics.close();
+ }
+ }
}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index 72dd435..635eedf 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -19,8 +19,10 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.connect.runtime.ConnectorConfig;
@@ -270,9 +272,15 @@ public class MirrorConnectorConfig extends AbstractConfig {
List<MetricsReporter> metricsReporters() {
List<MetricsReporter> reporters = getConfiguredInstances(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
- JmxReporter jmxReporter = new JmxReporter("kafka.connect.mirror");
+ JmxReporter jmxReporter = new JmxReporter();
jmxReporter.configure(this.originals());
reporters.add(jmxReporter);
+ MetricsContext metricsContext = new KafkaMetricsContext("kafka.connect.mirror");
+
+ for (MetricsReporter reporter : reporters) {
+ reporter.contextChange(metricsContext);
+ }
+
return reporters;
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index 57b5595..a3f44f9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -21,16 +21,20 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -62,8 +66,9 @@ public class ConnectMetrics {
* @param workerId the worker identifier; may not be null
* @param config the worker configuration; may not be null
* @param time the time; may not be null
+ * @param clusterId the Kafka cluster ID
*/
- public ConnectMetrics(String workerId, WorkerConfig config, Time time) {
+ public ConnectMetrics(String workerId, WorkerConfig config, Time time, String clusterId) {
this.workerId = workerId;
this.time = time;
@@ -75,10 +80,20 @@ public class ConnectMetrics {
MetricConfig metricConfig = new MetricConfig().samples(numSamples)
.timeWindow(sampleWindowMs, TimeUnit.MILLISECONDS).recordLevel(
Sensor.RecordingLevel.forName(metricsRecordingLevel));
- JmxReporter jmxReporter = new JmxReporter(JMX_PREFIX);
+ JmxReporter jmxReporter = new JmxReporter();
jmxReporter.configure(config.originals());
reporters.add(jmxReporter);
- this.metrics = new Metrics(metricConfig, reporters, time);
+
+ Map<String, Object> contextLabels = new HashMap<>();
+ contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+ contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, clusterId);
+ Object groupId = config.originals().get(DistributedConfig.GROUP_ID_CONFIG);
+ if (groupId != null) {
+ contextLabels.put(WorkerConfig.CONNECT_GROUP_ID, groupId);
+ }
+ MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, contextLabels);
+ this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
+
LOG.debug("Registering Connect metrics with JMX for worker '{}'", workerId);
AppInfoParser.registerAppInfo(JMX_PREFIX, workerId, metrics, time.milliseconds());
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index d0a04e8..28b1149 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -57,6 +57,7 @@ import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.LoggingContext;
import org.apache.kafka.connect.util.SinkUtils;
@@ -94,6 +95,8 @@ public class Worker {
private final ExecutorService executor;
private final Time time;
private final String workerId;
+ //kafka cluster id
+ private final String kafkaClusterId;
private final Plugins plugins;
private final ConnectMetrics metrics;
private final WorkerMetricsGroup workerMetricsGroup;
@@ -129,7 +132,8 @@ public class Worker {
ExecutorService executorService,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy
) {
- this.metrics = new ConnectMetrics(workerId, config, time);
+ this.kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+ this.metrics = new ConnectMetrics(workerId, config, time, kafkaClusterId);
this.executor = executorService;
this.workerId = workerId;
this.time = time;
@@ -530,13 +534,13 @@ public class Worker {
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);
Map<String, Object> producerProps = producerConfigs(id, "connector-producer-" + id, config, sourceConfig, connectorClass,
- connectorClientConfigOverridePolicy);
+ connectorClientConfigOverridePolicy, kafkaClusterId);
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
TopicAdmin admin;
Map<String, TopicCreationGroup> topicCreationGroups;
if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) {
Map<String, Object> adminProps = adminConfigs(id, "connector-adminclient-" + id, config,
- sourceConfig, connectorClass, connectorClientConfigOverridePolicy);
+ sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId);
admin = new TopicAdmin(adminProps);
topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig);
} else {
@@ -554,7 +558,7 @@ public class Worker {
SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
- Map<String, Object> consumerProps = consumerConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
+ Map<String, Object> consumerProps = consumerConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId);
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps);
return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
@@ -571,7 +575,8 @@ public class Worker {
WorkerConfig config,
ConnectorConfig connConfig,
Class<? extends Connector> connectorClass,
- ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+ String clusterId) {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
@@ -586,6 +591,8 @@ public class Worker {
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, defaultClientId);
// User-specified overrides
producerProps.putAll(config.originalsWithPrefix("producer."));
+ //add client metrics.context properties
+ ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
// Connector-specified overrides
Map<String, Object> producerOverrides =
@@ -601,7 +608,8 @@ public class Worker {
WorkerConfig config,
ConnectorConfig connConfig,
Class<? extends Connector> connectorClass,
- ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+ String clusterId) {
// Include any unknown worker configs so consumer configs can be set globally on the worker
// and through to the task
Map<String, Object> consumerProps = new HashMap<>();
@@ -616,6 +624,8 @@ public class Worker {
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.putAll(config.originalsWithPrefix("consumer."));
+ //add client metrics.context properties
+ ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);
// Connector-specified overrides
Map<String, Object> consumerOverrides =
connectorClientConfigOverrides(id, connConfig, connectorClass, ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
@@ -631,7 +641,8 @@ public class Worker {
WorkerConfig config,
ConnectorConfig connConfig,
Class<? extends Connector> connectorClass,
- ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+ String clusterId) {
Map<String, Object> adminProps = new HashMap<>();
// Use the top-level worker configs to retain backwards compatibility with older releases which
// did not require a prefix for connector admin client configs in the worker configuration file
@@ -658,6 +669,9 @@ public class Worker {
connectorClientConfigOverridePolicy);
adminProps.putAll(adminOverrides);
+ //add client metrics.context properties
+ ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
+
return adminProps;
}
@@ -701,8 +715,8 @@ public class Worker {
String topic = connConfig.dlqTopicName();
if (topic != null && !topic.isEmpty()) {
Map<String, Object> producerProps = producerConfigs(id, "connector-dlq-producer-" + id, config, connConfig, connectorClass,
- connectorClientConfigOverridePolicy);
- Map<String, Object> adminProps = adminConfigs(id, "connector-dlq-adminclient-", config, connConfig, connectorClass, connectorClientConfigOverridePolicy);
+ connectorClientConfigOverridePolicy, kafkaClusterId);
+ Map<String, Object> adminProps = adminConfigs(id, "connector-dlq-adminclient-", config, connConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId);
DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(adminProps, id, connConfig, producerProps, errorHandlingMetrics);
reporters.add(reporter);
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 0234926..9e40e56 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -251,6 +251,9 @@ public class WorkerConfig extends AbstractConfig {
+ "user requests to reset the set of active topics per connector.";
protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true;
+ public static final String CONNECT_KAFKA_CLUSTER_ID = "connect.kafka.cluster.id";
+ public static final String CONNECT_GROUP_ID = "connect.group.id";
+
public static final String TOPIC_CREATION_ENABLE_CONFIG = "topic.creation.enable";
protected static final String TOPIC_CREATION_ENABLE_DOC = "Whether to allow "
+ "automatic creation of topics used by source connectors, when source connectors "
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index a0a00e1..40f2b07 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -27,6 +27,8 @@ import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
@@ -36,12 +38,15 @@ import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import java.net.InetSocketAddress;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -88,10 +93,17 @@ public class WorkerGroupMember {
List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class,
Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId));
- JmxReporter jmxReporter = new JmxReporter(JMX_PREFIX);
+ JmxReporter jmxReporter = new JmxReporter();
jmxReporter.configure(config.originals());
reporters.add(jmxReporter);
- this.metrics = new Metrics(metricConfig, reporters, time);
+
+ Map<String, Object> contextLabels = new HashMap<>();
+ contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+ contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, ConnectUtils.lookupKafkaClusterId(config));
+ contextLabels.put(WorkerConfig.CONNECT_GROUP_ID, config.getString(DistributedConfig.GROUP_ID_CONFIG));
+ MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, contextLabels);
+
+ this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),
logContext, new ClusterResourceListeners());
@@ -223,4 +235,9 @@ public class WorkerGroupMember {
else
log.debug("The Connect group member has stopped.");
}
+
+ // Visible for testing
+ Metrics metrics() {
+ return this.metrics;
+ }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index b9e9831..60c2665 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -40,6 +40,7 @@ import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
@@ -444,17 +445,21 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
// package private for testing
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
+ String clusterId = ConnectUtils.lookupKafkaClusterId(config);
Map<String, Object> originals = config.originals();
Map<String, Object> producerProps = new HashMap<>(originals);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
+ ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
+
Map<String, Object> consumerProps = new HashMap<>(originals);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);
Map<String, Object> adminProps = new HashMap<>(originals);
-
+ ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).configStorageTopicSettings()
: Collections.emptyMap();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 6100d07..1d26d65 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConvertingFutureCallback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
@@ -66,6 +67,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
if (topic == null || topic.trim().length() == 0)
throw new ConfigException("Offset storage topic must be specified");
+ String clusterId = ConnectUtils.lookupKafkaClusterId(config);
data = new HashMap<>();
Map<String, Object> originals = config.originals();
@@ -73,13 +75,15 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
+ ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
Map<String, Object> consumerProps = new HashMap<>(originals);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);
Map<String, Object> adminProps = new HashMap<>(originals);
-
+ ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).offsetStorageTopicSettings()
: Collections.emptyMap();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 9827300..0b0f4a5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -40,6 +40,7 @@ import org.apache.kafka.connect.runtime.TopicStatus;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.Table;
@@ -152,17 +153,21 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
if (this.statusTopic == null || this.statusTopic.trim().length() == 0)
throw new ConfigException("Must specify topic for connector status.");
+ String clusterId = ConnectUtils.lookupKafkaClusterId(config);
Map<String, Object> originals = config.originals();
Map<String, Object> producerProps = new HashMap<>(originals);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class
+ ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
Map<String, Object> consumerProps = new HashMap<>(originals);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);
Map<String, Object> adminProps = new HashMap<>(originals);
+ ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).statusStorageTopicSettings()
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
index e1e4a87..7187f63 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
@@ -16,15 +16,18 @@
*/
package org.apache.kafka.connect.util;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
public final class ConnectUtils {
@@ -65,4 +68,15 @@ public final class ConnectUtils {
+ "Check worker's broker connection and security properties.", e);
}
}
+
+ public static void addMetricsContextProperties(Map<String, Object> prop, WorkerConfig config, String clusterId) {
+ //add all properties predefined with "metrics.context."
+ prop.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX, false));
+ //add connect properties
+ prop.put(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, clusterId);
+ Object groupId = config.originals().get(DistributedConfig.GROUP_ID_CONFIG);
+ if (groupId != null) {
+ prop.put(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_GROUP_ID, groupId);
+ }
+ }
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index fe15c01..3f75c0b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -53,7 +53,7 @@ public class ConnectMetricsTest {
@Before
public void setUp() {
- metrics = new ConnectMetrics("worker1", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), new MockTime());
+ metrics = new ConnectMetrics("worker1", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), new MockTime(), "cluster-1");
}
@After
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
index 64ea526..38bd401 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.utils.MockTime;
@@ -58,7 +59,7 @@ public class MockConnectMetrics extends ConnectMetrics {
}
public MockConnectMetrics(MockTime time) {
- super("mock", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), time);
+ super("mock", new WorkerConfig(WorkerConfig.baseConfigDef(), DEFAULT_WORKER_CONFIG), time, "cluster-1");
}
@Override
@@ -154,6 +155,8 @@ public class MockConnectMetrics extends ConnectMetrics {
public static class MockMetricsReporter implements MetricsReporter {
private Map<MetricName, KafkaMetric> metricsByName = new HashMap<>();
+ private MetricsContext metricsContext;
+
public MockMetricsReporter() {
}
@@ -194,5 +197,14 @@ public class MockConnectMetrics extends ConnectMetrics {
KafkaMetric metric = metricsByName.get(metricName);
return metric != null ? metric.metricValue() : null;
}
+
+ @Override
+ public void contextChange(MetricsContext metricsContext) {
+ this.metricsContext = metricsContext;
+ }
+
+ public MetricsContext getMetricsContext() {
+ return this.metricsContext;
+ }
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index e32fcb0..48ab58f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -21,9 +21,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.config.provider.MockFileConfigProvider;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -58,6 +61,7 @@ import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.ThreadedTest;
import org.apache.kafka.connect.util.TopicAdmin;
@@ -74,12 +78,17 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -96,13 +105,14 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
-@PrepareForTest({Worker.class, Plugins.class})
+@PrepareForTest({Worker.class, Plugins.class, ConnectUtils.class})
@PowerMockIgnore("javax.management.*")
public class WorkerTest extends ThreadedTest {
private static final String CONNECTOR_ID = "test-connector";
private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
private static final String WORKER_ID = "localhost:8083";
+ private static final String CLUSTER_ID = "test-cluster";
private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
private final ConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();
@@ -227,6 +237,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expectLastCall();
expectStopStorage();
+ expectClusterId();
PowerMock.replayAll();
@@ -288,6 +299,8 @@ public class WorkerTest extends ThreadedTest {
);
EasyMock.expectLastCall();
+ expectClusterId();
+
PowerMock.replayAll();
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
@@ -345,6 +358,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expectLastCall();
expectStopStorage();
+ expectClusterId();
PowerMock.replayAll();
@@ -407,6 +421,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expectLastCall();
expectStopStorage();
+ expectClusterId();
PowerMock.replayAll();
@@ -435,6 +450,7 @@ public class WorkerTest extends ThreadedTest {
expectConverters();
expectStartStorage();
expectFileConfigProvider();
+ expectClusterId();
PowerMock.replayAll();
@@ -491,6 +507,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expectLastCall();
expectStopStorage();
+ expectClusterId();
PowerMock.replayAll();
@@ -586,6 +603,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expectLastCall();
expectStopStorage();
+ expectClusterId();
PowerMock.replayAll();
@@ -685,6 +703,8 @@ public class WorkerTest extends ThreadedTest {
workerTask.stop();
EasyMock.expectLastCall();
+ expectClusterId();
+
PowerMock.replayAll();
worker = new Worker(WORKER_ID,
@@ -744,6 +764,8 @@ public class WorkerTest extends ThreadedTest {
taskStatusListener.onFailure(EasyMock.eq(TASK_ID), EasyMock.<ConfigException>anyObject());
EasyMock.expectLastCall();
+ expectClusterId();
+
PowerMock.replayAll();
worker = new Worker(WORKER_ID,
@@ -791,6 +813,8 @@ public class WorkerTest extends ThreadedTest {
taskStatusListener.onFailure(EasyMock.eq(TASK_ID), EasyMock.<ConfigException>anyObject());
EasyMock.expectLastCall();
+ expectClusterId();
+
PowerMock.replayAll();
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
@@ -867,6 +891,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expectLastCall();
expectStopStorage();
+ expectClusterId();
PowerMock.replayAll();
@@ -940,6 +965,7 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expectLastCall();
expectStopStorage();
+ expectClusterId();
PowerMock.replayAll();
@@ -976,8 +1002,9 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
Map<String, String> expectedConfigs = new HashMap<>(defaultProducerConfigs);
expectedConfigs.put("client.id", "connector-producer-job-0");
+ expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
assertEquals(expectedConfigs,
- Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, config, connectorConfig, null, noneConnectorClientConfigOverridePolicy));
+ Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, config, connectorConfig, null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
}
@Test
@@ -992,11 +1019,13 @@ public class WorkerTest extends ThreadedTest {
expectedConfigs.put("acks", "-1");
expectedConfigs.put("linger.ms", "1000");
expectedConfigs.put("client.id", "producer-test-id");
+ expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
+
EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).andReturn(
new HashMap<String, Object>());
PowerMock.replayAll();
assertEquals(expectedConfigs,
- Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy));
+ Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
}
@Test
@@ -1012,6 +1041,8 @@ public class WorkerTest extends ThreadedTest {
expectedConfigs.put("linger.ms", "5000");
expectedConfigs.put("batch.size", "1000");
expectedConfigs.put("client.id", "producer-test-id");
+ expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
+
Map<String, Object> connConfig = new HashMap<String, Object>();
connConfig.put("linger.ms", "5000");
connConfig.put("batch.size", "1000");
@@ -1019,7 +1050,7 @@ public class WorkerTest extends ThreadedTest {
.andReturn(connConfig);
PowerMock.replayAll();
assertEquals(expectedConfigs,
- Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy));
+ Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
}
@Test
@@ -1027,10 +1058,12 @@ public class WorkerTest extends ThreadedTest {
Map<String, String> expectedConfigs = new HashMap<>(defaultConsumerConfigs);
expectedConfigs.put("group.id", "connect-test");
expectedConfigs.put("client.id", "connector-consumer-test-1");
+ expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
+
EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).andReturn(new HashMap<>());
PowerMock.replayAll();
assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), config, connectorConfig,
- null, noneConnectorClientConfigOverridePolicy));
+ null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
}
@Test
@@ -1046,10 +1079,12 @@ public class WorkerTest extends ThreadedTest {
expectedConfigs.put("auto.offset.reset", "latest");
expectedConfigs.put("max.poll.records", "1000");
expectedConfigs.put("client.id", "consumer-test-id");
+ expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
+
EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).andReturn(new HashMap<>());
PowerMock.replayAll();
assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
- null, noneConnectorClientConfigOverridePolicy));
+ null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
}
@@ -1066,6 +1101,8 @@ public class WorkerTest extends ThreadedTest {
expectedConfigs.put("max.poll.records", "5000");
expectedConfigs.put("max.poll.interval.ms", "1000");
expectedConfigs.put("client.id", "connector-consumer-test-1");
+ expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
+
Map<String, Object> connConfig = new HashMap<String, Object>();
connConfig.put("max.poll.records", "5000");
connConfig.put("max.poll.interval.ms", "1000");
@@ -1073,7 +1110,7 @@ public class WorkerTest extends ThreadedTest {
.andReturn(connConfig);
PowerMock.replayAll();
assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
- null, allConnectorClientConfigOverridePolicy));
+ null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
}
@Test(expected = ConnectException.class)
@@ -1090,7 +1127,7 @@ public class WorkerTest extends ThreadedTest {
.andReturn(connConfig);
PowerMock.replayAll();
Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
- null, noneConnectorClientConfigOverridePolicy);
+ null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID);
}
@Test
@@ -1110,12 +1147,14 @@ public class WorkerTest extends ThreadedTest {
expectedConfigs.put("bootstrap.servers", "localhost:9092");
expectedConfigs.put("client.id", "testid");
expectedConfigs.put("metadata.max.age.ms", "10000");
+ //we added a config on the fly
+ expectedConfigs.put("metrics.context.connect.kafka.cluster.id", CLUSTER_ID);
EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX))
.andReturn(connConfig);
PowerMock.replayAll();
assertEquals(expectedConfigs, Worker.adminConfigs(new ConnectorTaskId("test", 1), "", configWithOverrides, connectorConfig,
- null, allConnectorClientConfigOverridePolicy));
+ null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
}
@Test(expected = ConnectException.class)
@@ -1132,7 +1171,78 @@ public class WorkerTest extends ThreadedTest {
.andReturn(connConfig);
PowerMock.replayAll();
Worker.adminConfigs(new ConnectorTaskId("test", 1), "", configWithOverrides, connectorConfig,
- null, noneConnectorClientConfigOverridePolicy);
+ null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID);
+
+ }
+
+ @Test
+ public void testWorkerMetrics() throws Exception {
+ expectConverters();
+ expectStartStorage();
+ expectFileConfigProvider();
+
+ // Create
+ EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
+ EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName()))
+ .andReturn(sourceConnector);
+ EasyMock.expect(sourceConnector.version()).andReturn("1.0");
+
+ Map<String, String> props = new HashMap<>();
+ props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
+ props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+ props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
+ props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
+
+ EasyMock.expect(sourceConnector.version()).andReturn("1.0");
+
+ EasyMock.expect(plugins.compareAndSwapLoaders(sourceConnector))
+ .andReturn(delegatingLoader)
+ .times(2);
+ sourceConnector.initialize(anyObject(ConnectorContext.class));
+ EasyMock.expectLastCall();
+ sourceConnector.start(props);
+ EasyMock.expectLastCall();
+
+ EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader))
+ .andReturn(pluginLoader).times(2);
+
+ connectorStatusListener.onStartup(CONNECTOR_ID);
+ EasyMock.expectLastCall();
+
+ // Remove
+ sourceConnector.stop();
+ EasyMock.expectLastCall();
+
+ connectorStatusListener.onShutdown(CONNECTOR_ID);
+ EasyMock.expectLastCall();
+
+ expectStopStorage();
+ expectClusterId();
+
+ PowerMock.replayAll();
+
+ Worker worker = new Worker("worker-1",
+ Time.SYSTEM,
+ plugins,
+ config,
+ offsetBackingStore,
+ noneConnectorClientConfigOverridePolicy
+ );
+ MetricName name = worker.metrics().metrics().metricName("test.avg", "grp1");
+ worker.metrics().metrics().addMetric(name, new Avg());
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+ Set<ObjectInstance> ret = server.queryMBeans(null, null);
+
+ List<MetricsReporter> list = worker.metrics().metrics().reporters();
+ for (MetricsReporter reporter : list) {
+ if (reporter instanceof MockMetricsReporter) {
+ MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) reporter;
+ //verify connect cluster is set in MetricsContext
+ assertEquals(CLUSTER_ID, mockMetricsReporter.getMetricsContext().contextLabels().get(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID));
+ }
+ }
+ //verify metric is created with correct jmx prefix
+ assertNotNull(server.getObjectInstance(new ObjectName("kafka.connect:type=grp1")));
}
private void assertStatusMetrics(long expected, String metricName) {
@@ -1272,6 +1382,11 @@ public class WorkerTest extends ThreadedTest {
return props;
}
+ private void expectClusterId() {
+ PowerMock.mockStaticPartial(ConnectUtils.class, "lookupKafkaClusterId");
+ EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
+ }
+
private void expectNewWorkerTask() throws Exception {
PowerMock.expectNew(
WorkerSourceTask.class, EasyMock.eq(TASK_ID),
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
index e3c982e..8b54033 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
@@ -57,6 +57,7 @@ import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.ThreadedTest;
import org.apache.kafka.connect.util.TopicAdmin;
@@ -98,13 +99,14 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
-@PrepareForTest({Worker.class, Plugins.class})
+@PrepareForTest({Worker.class, Plugins.class, ConnectUtils.class})
@PowerMockIgnore("javax.management.*")
public class WorkerWithTopicCreationTest extends ThreadedTest {
private static final String CONNECTOR_ID = "test-connector";
private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
private static final String WORKER_ID = "localhost:8083";
+ private static final String CLUSTER_ID = "test-cluster";
private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
private final ConnectorClientConfigOverridePolicy allConnectorClientConfigOverridePolicy = new AllConnectorClientConfigOverridePolicy();
@@ -228,6 +230,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
EasyMock.expectLastCall();
expectStopStorage();
+ expectClusterId();
PowerMock.replayAll();
@@ -288,6 +291,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
EasyMock.<ConnectException>anyObject()
);
EasyMock.expectLastCall();
+ expectClusterId();
PowerMock.replayAll();
@@ -346,6 +350,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
EasyMock.expectLastCall();
expectStopStorage();
+ expectClusterId();
PowerMock.replayAll();
@@ -408,6 +413,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
EasyMock.expectLastCall();
expectStopStorage();
+ expectClusterId();
PowerMock.replayAll();
@@ -436,6 +442,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
expectConverters();
expectStartStorage();
expectFileConfigProvider();
+ expectClusterId();
PowerMock.replayAll();
@@ -492,6 +499,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
EasyMock.expectLastCall();
expectStopStorage();
+ expectClusterId();
PowerMock.replayAll();
@@ -587,6 +595,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
EasyMock.expectLastCall();
expectStopStorage();
+ expectClusterId();
PowerMock.replayAll();
@@ -685,6 +694,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
EasyMock.expect(workerTask.loader()).andReturn(pluginLoader);
workerTask.stop();
EasyMock.expectLastCall();
+ expectClusterId();
PowerMock.replayAll();
@@ -744,6 +754,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
taskStatusListener.onFailure(EasyMock.eq(TASK_ID), EasyMock.<ConfigException>anyObject());
EasyMock.expectLastCall();
+ expectClusterId();
PowerMock.replayAll();
@@ -791,6 +802,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
taskStatusListener.onFailure(EasyMock.eq(TASK_ID), EasyMock.<ConfigException>anyObject());
EasyMock.expectLastCall();
+ expectClusterId();
PowerMock.replayAll();
@@ -868,6 +880,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
EasyMock.expectLastCall();
expectStopStorage();
+ expectClusterId();
PowerMock.replayAll();
@@ -941,6 +954,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
EasyMock.expectLastCall();
expectStopStorage();
+ expectClusterId();
PowerMock.replayAll();
@@ -977,8 +991,9 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
PowerMock.replayAll();
Map<String, String> expectedConfigs = new HashMap<>(defaultProducerConfigs);
expectedConfigs.put("client.id", "connector-producer-job-0");
+ expectedConfigs.put("metrics.context.connect.kafka.cluster.id", "test-cluster");
assertEquals(expectedConfigs,
- Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, config, connectorConfig, null, noneConnectorClientConfigOverridePolicy));
+ Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, config, connectorConfig, null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
}
@Test
@@ -993,11 +1008,12 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
expectedConfigs.put("acks", "-1");
expectedConfigs.put("linger.ms", "1000");
expectedConfigs.put("client.id", "producer-test-id");
+ expectedConfigs.put("metrics.context.connect.kafka.cluster.id", "test-cluster");
EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX)).andReturn(
new HashMap<String, Object>());
PowerMock.replayAll();
assertEquals(expectedConfigs,
- Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy));
+ Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
}
@Test
@@ -1013,6 +1029,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
expectedConfigs.put("linger.ms", "5000");
expectedConfigs.put("batch.size", "1000");
expectedConfigs.put("client.id", "producer-test-id");
+ expectedConfigs.put("metrics.context.connect.kafka.cluster.id", "test-cluster");
Map<String, Object> connConfig = new HashMap<String, Object>();
connConfig.put("linger.ms", "5000");
connConfig.put("batch.size", "1000");
@@ -1020,7 +1037,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
.andReturn(connConfig);
PowerMock.replayAll();
assertEquals(expectedConfigs,
- Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy));
+ Worker.producerConfigs(TASK_ID, "connector-producer-" + TASK_ID, configWithOverrides, connectorConfig, null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
}
@Test
@@ -1028,10 +1045,11 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
Map<String, String> expectedConfigs = new HashMap<>(defaultConsumerConfigs);
expectedConfigs.put("group.id", "connect-test");
expectedConfigs.put("client.id", "connector-consumer-test-1");
+ expectedConfigs.put("metrics.context.connect.kafka.cluster.id", "test-cluster");
EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).andReturn(new HashMap<>());
PowerMock.replayAll();
assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), config, connectorConfig,
- null, noneConnectorClientConfigOverridePolicy));
+ null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
}
@Test
@@ -1047,10 +1065,11 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
expectedConfigs.put("auto.offset.reset", "latest");
expectedConfigs.put("max.poll.records", "1000");
expectedConfigs.put("client.id", "consumer-test-id");
+ expectedConfigs.put("metrics.context.connect.kafka.cluster.id", "test-cluster");
EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX)).andReturn(new HashMap<>());
PowerMock.replayAll();
assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
- null, noneConnectorClientConfigOverridePolicy));
+ null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID));
}
@@ -1067,6 +1086,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
expectedConfigs.put("max.poll.records", "5000");
expectedConfigs.put("max.poll.interval.ms", "1000");
expectedConfigs.put("client.id", "connector-consumer-test-1");
+ expectedConfigs.put("metrics.context.connect.kafka.cluster.id", "test-cluster");
Map<String, Object> connConfig = new HashMap<String, Object>();
connConfig.put("max.poll.records", "5000");
connConfig.put("max.poll.interval.ms", "1000");
@@ -1074,7 +1094,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
.andReturn(connConfig);
PowerMock.replayAll();
assertEquals(expectedConfigs, Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
- null, allConnectorClientConfigOverridePolicy));
+ null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
}
@Test(expected = ConnectException.class)
@@ -1091,7 +1111,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
.andReturn(connConfig);
PowerMock.replayAll();
Worker.consumerConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
- null, noneConnectorClientConfigOverridePolicy);
+ null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID);
}
@Test
@@ -1111,12 +1131,13 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
expectedConfigs.put("bootstrap.servers", "localhost:9092");
expectedConfigs.put("client.id", "testid");
expectedConfigs.put("metadata.max.age.ms", "10000");
+ expectedConfigs.put("metrics.context.connect.kafka.cluster.id", "test-cluster");
EasyMock.expect(connectorConfig.originalsWithPrefix(ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX))
.andReturn(connConfig);
PowerMock.replayAll();
assertEquals(expectedConfigs, Worker.adminConfigs(new ConnectorTaskId("test", 1), "", configWithOverrides, connectorConfig,
- null, allConnectorClientConfigOverridePolicy));
+ null, allConnectorClientConfigOverridePolicy, CLUSTER_ID));
}
@Test(expected = ConnectException.class)
@@ -1133,7 +1154,7 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
.andReturn(connConfig);
PowerMock.replayAll();
Worker.adminConfigs(new ConnectorTaskId("test", 1), "", configWithOverrides, connectorConfig,
- null, noneConnectorClientConfigOverridePolicy);
+ null, noneConnectorClientConfigOverridePolicy, CLUSTER_ID);
}
private void assertStatusMetrics(long expected, String metricName) {
@@ -1299,6 +1320,12 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
anyObject(StatusBackingStore.class))
.andReturn(workerTask);
}
+
+ private void expectClusterId() {
+ PowerMock.mockStaticPartial(ConnectUtils.class, "lookupKafkaClusterId");
+ EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
+ }
+
/* Name here needs to be unique as we are testing the aliasing mechanism */
public static class WorkerTestConnector extends SourceConnector {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
new file mode 100644
index 0000000..05cd017
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ConnectUtils.class})
+@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
+public class WorkerGroupMemberTest {
+ @Mock
+ private ConfigBackingStore configBackingStore;
+ @Mock
+ private StatusBackingStore statusBackingStore;
+
+ @Test
+ public void testMetrics() throws Exception {
+ WorkerGroupMember member;
+ Map<String, String> workerProps = new HashMap<>();
+ workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+ workerProps.put("group.id", "group-1");
+ workerProps.put("offset.storage.topic", "topic-1");
+ workerProps.put("config.storage.topic", "topic-1");
+ workerProps.put("status.storage.topic", "topic-1");
+ workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockConnectMetrics.MockMetricsReporter.class.getName());
+ DistributedConfig config = new DistributedConfig(workerProps);
+
+
+ LogContext logContext = new LogContext("[Worker clientId=client-1 + groupId= group-1]");
+
+ expectClusterId();
+
+ member = new WorkerGroupMember(config, "", configBackingStore,
+ null, Time.SYSTEM, "client-1", logContext);
+
+ boolean entered = false;
+ for (MetricsReporter reporter : member.metrics().reporters()) {
+ if (reporter instanceof MockConnectMetrics.MockMetricsReporter) {
+ entered = true;
+ MockConnectMetrics.MockMetricsReporter mockMetricsReporter = (MockConnectMetrics.MockMetricsReporter) reporter;
+ assertEquals("cluster-1", mockMetricsReporter.getMetricsContext().contextLabels().get(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID));
+ assertEquals("group-1", mockMetricsReporter.getMetricsContext().contextLabels().get(WorkerConfig.CONNECT_GROUP_ID));
+ }
+ }
+ assertTrue("Failed to verify MetricsReporter", entered);
+
+ MetricName name = member.metrics().metricName("test.avg", "grp1");
+ member.metrics().addMetric(name, new Avg());
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+ //verify metric exists with correct prefix
+ assertNotNull(server.getObjectInstance(new ObjectName("kafka.connect:type=grp1,client-id=client-1")));
+ }
+ private void expectClusterId() {
+ PowerMock.mockStaticPartial(ConnectUtils.class, "lookupKafkaClusterId");
+ EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("cluster-1").anyTimes();
+ PowerMock.replay(ConnectUtils.class);
+ }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
index be72314..43bac54 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
@@ -79,7 +79,7 @@ public class RetryWithToleranceOperatorTest {
NOOP_OPERATOR.metrics(new ErrorHandlingMetrics(
new ConnectorTaskId("noop-connector", -1),
- new ConnectMetrics("noop-worker", new TestableWorkerConfig(properties), new SystemTime()))
+ new ConnectMetrics("noop-worker", new TestableWorkerConfig(properties), new SystemTime(), "test-cluster"))
);
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 61fe51f..8d78219 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TestFuture;
@@ -62,7 +63,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaConfigBackingStore.class)
+@PrepareForTest({KafkaConfigBackingStore.class, ConnectUtils.class})
@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
@SuppressWarnings({"unchecked", "deprecation"})
public class KafkaConfigBackingStoreTest {
@@ -147,6 +148,10 @@ public class KafkaConfigBackingStoreTest {
@Before
public void setUp() {
+ PowerMock.mockStaticPartial(ConnectUtils.class, "lookupKafkaClusterId");
+ EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
+ PowerMock.replay(ConnectUtils.class);
+
configStorage = PowerMock.createPartialMock(KafkaConfigBackingStore.class, new String[]{"createKafkaBasedLog"}, converter, DEFAULT_DISTRIBUTED_CONFIG, null);
Whitebox.setInternalState(configStorage, "configLog", storeLog);
configStorage.setUpdateListener(configUpdateListener);
@@ -157,7 +162,6 @@ public class KafkaConfigBackingStoreTest {
expectConfigure();
expectStart(Collections.emptyList(), Collections.emptyMap());
expectStop();
-
PowerMock.replayAll();
Map<String, String> settings = new HashMap<>(DEFAULT_CONFIG_STORAGE_PROPS);
@@ -982,5 +986,4 @@ public class KafkaConfigBackingStoreTest {
result.put(field.name(), struct.get(field));
return result;
}
-
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index b72f269..0d0194f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.easymock.Capture;
import org.easymock.EasyMock;
@@ -58,7 +59,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaOffsetBackingStore.class)
+@PrepareForTest({KafkaOffsetBackingStore.class, ConnectUtils.class})
@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
@SuppressWarnings({"unchecked", "deprecation"})
public class KafkaOffsetBackingStoreTest {
@@ -119,6 +120,7 @@ public class KafkaOffsetBackingStoreTest {
expectConfigure();
expectStart(Collections.emptyList());
expectStop();
+ expectClusterId();
PowerMock.replayAll();
@@ -152,6 +154,7 @@ public class KafkaOffsetBackingStoreTest {
new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array())
));
expectStop();
+ expectClusterId();
PowerMock.replayAll();
@@ -214,6 +217,7 @@ public class KafkaOffsetBackingStoreTest {
}
});
+ expectClusterId();
PowerMock.replayAll();
store.configure(DEFAULT_DISTRIBUTED_CONFIG);
@@ -284,6 +288,7 @@ public class KafkaOffsetBackingStoreTest {
});
expectStop();
+ expectClusterId();
PowerMock.replayAll();
@@ -337,6 +342,8 @@ public class KafkaOffsetBackingStoreTest {
storeLog.send(EasyMock.aryEq(TP2_KEY.array()), EasyMock.aryEq(TP2_VALUE.array()), EasyMock.capture(callback2));
PowerMock.expectLastCall();
+ expectClusterId();
+
PowerMock.replayAll();
store.configure(DEFAULT_DISTRIBUTED_CONFIG);
@@ -404,6 +411,11 @@ public class KafkaOffsetBackingStoreTest {
PowerMock.expectLastCall();
}
+ private void expectClusterId() {
+ PowerMock.mockStaticPartial(ConnectUtils.class, "lookupKafkaClusterId");
+ EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
+ }
+
private static ByteBuffer buffer(String v) {
return ByteBuffer.wrap(v.getBytes());
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
index f7e092c6..f92143a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
@@ -16,13 +16,19 @@
*/
package org.apache.kafka.connect.util;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.junit.Test;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -61,4 +67,37 @@ public class ConnectUtilsTest {
ConnectUtils.lookupKafkaClusterId(adminClient);
}
+ @Test
+ public void testAddMetricsContextPropertiesDistributed() {
+ Map<String, String> props = new HashMap<>();
+ props.put(DistributedConfig.GROUP_ID_CONFIG, "connect-cluster");
+ props.put(DistributedConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "connect-configs");
+ props.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
+ props.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "connect-status");
+ props.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ props.put(DistributedConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ DistributedConfig config = new DistributedConfig(props);
+
+ Map<String, Object> prop = new HashMap<>();
+ ConnectUtils.addMetricsContextProperties(prop, config, "cluster-1");
+ assertEquals("connect-cluster", prop.get(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_GROUP_ID));
+ assertEquals("cluster-1", prop.get(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_KAFKA_CLUSTER_ID));
+ }
+
+ @Test
+ public void testAddMetricsContextPropertiesStandalone() {
+ Map<String, String> props = new HashMap<>();
+ props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "offsetStorageFile");
+ props.put(StandaloneConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StandaloneConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ props.put(StandaloneConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ StandaloneConfig config = new StandaloneConfig(props);
+
+ Map<String, Object> prop = new HashMap<>();
+ ConnectUtils.addMetricsContextProperties(prop, config, "cluster-1");
+ assertEquals(null, prop.get(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_GROUP_ID));
+ assertEquals("cluster-1", prop.get(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_KAFKA_CLUSTER_ID));
+
+ }
}
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index f41dbf2..ec8e00b 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -793,6 +793,7 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf
currentReporters += reporter.getClass.getName -> reporter
}
server.notifyClusterListeners(reporters.asScala)
+ server.notifyMetricsReporters(reporters.asScala)
}
private def removeReporter(className: String): Unit = {
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 05a6257..bad56aa 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -35,10 +35,10 @@ import kafka.network.SocketServer
import kafka.security.CredentialProvider
import kafka.utils._
import kafka.zk.{BrokerInfo, KafkaZkClient}
-import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
+import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient, NetworkClientUtils, CommonClientConfigs}
import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.message.ControlledShutdownRequestData
-import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
+import org.apache.kafka.common.metrics.{JmxReporter, Metrics, MetricsReporter, _}
import org.apache.kafka.common.network._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
@@ -129,7 +129,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
private var shutdownLatch = new CountDownLatch(1)
- private val jmxPrefix: String = "kafka.server"
+ //properties for MetricsContext
+ private val metricsPrefix: String = "kafka.server"
+ private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
+ private val KAFKA_BROKER_ID: String = "kafka.broker.id"
+
private var logContext: LogContext = null
@@ -247,13 +251,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
kafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
kafkaYammerMetrics.configure(config.originals)
- val jmxReporter = new JmxReporter(jmxPrefix)
+ val jmxReporter = new JmxReporter()
jmxReporter.configure(config.originals)
val reporters = new util.ArrayList[MetricsReporter]
reporters.add(jmxReporter)
+
val metricConfig = KafkaServer.metricConfig(config)
- metrics = new Metrics(metricConfig, reporters, time, true)
+ val metricsContext = createKafkaMetricsContext()
+ metrics = new Metrics(metricConfig, reporters, time, true, metricsContext)
/* register broker metrics */
_brokerTopicStats = new BrokerTopicStats
@@ -365,7 +371,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
- AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics, time.milliseconds())
+ AppInfoParser.registerAppInfo(metricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
info("started")
}
}
@@ -384,6 +390,23 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
}
+ private[server] def notifyMetricsReporters(metricsReporters: Seq[AnyRef]): Unit = {
+ val metricsContext = createKafkaMetricsContext()
+ metricsReporters.foreach {
+ case x: MetricsReporter => x.contextChange(metricsContext)
+ case _ => //do nothing
+ }
+ }
+
+ private[server] def createKafkaMetricsContext() : KafkaMetricsContext = {
+ val contextLabels = new util.HashMap[String, Object]
+ contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+ contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+ contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
+ val metricsContext = new KafkaMetricsContext(metricsPrefix, contextLabels)
+ metricsContext
+ }
+
protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown, quotaManagers,
brokerTopicStats, metadataCache, logDirFailureChannel)
@@ -687,7 +710,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
startupComplete.set(false)
isShuttingDown.set(false)
- CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString, metrics), this)
+ CoreUtils.swallow(AppInfoParser.unregisterAppInfo(metricsPrefix, config.brokerId.toString, metrics), this)
shutdownLatch.countDown()
info("shut down completed")
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 405f12c..6266d7f 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.metrics.{JmxReporter, Metrics => kMetrics}
+import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, Metrics => kMetrics}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.OffsetFetchResponse
@@ -2356,7 +2356,9 @@ class GroupMetadataManagerTest {
def testPartitionLoadMetric(): Unit = {
val server = ManagementFactory.getPlatformMBeanServer
val mBeanName = "kafka.server:type=group-coordinator-metrics"
- val reporter = new JmxReporter("kafka.server")
+ val reporter = new JmxReporter
+ val metricsContext = new KafkaMetricsContext("kafka.server")
+ reporter.contextChange(metricsContext)
metrics.addReporter(reporter)
def partitionLoadTime(attribute: String): Double = {
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index e02f46d..3055742 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -28,7 +28,7 @@ import kafka.utils.{MockScheduler, Pool, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME
-import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
+import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, Metrics}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -773,7 +773,9 @@ class TransactionStateManagerTest {
def testPartitionLoadMetric(): Unit = {
val server = ManagementFactory.getPlatformMBeanServer
val mBeanName = "kafka.server:type=transaction-coordinator-metrics"
- val reporter = new JmxReporter("kafka.server")
+ val reporter = new JmxReporter
+ val metricsContext = new KafkaMetricsContext("kafka.server")
+ reporter.contextChange(metricsContext)
metrics.addReporter(reporter)
def partitionLoadTime(attribute: String): Double = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
new file mode 100644
index 0000000..4e0c9f8
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+
+import java.util.concurrent.atomic.AtomicReference
+
+import kafka.utils.{CoreUtils, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.metrics.{KafkaMetric, MetricsContext, MetricsReporter}
+import org.junit.Assert.{assertEquals}
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+
+
+object KafkaMetricsReporterTest {
+ val setupError = new AtomicReference[String]("")
+
+ class MockMetricsReporter extends MetricsReporter {
+ def init(metrics: util.List[KafkaMetric]): Unit = {}
+
+ def metricChange(metric: KafkaMetric): Unit = {}
+
+ def metricRemoval(metric: KafkaMetric): Unit = {}
+
+ override def close(): Unit = {}
+
+ override def contextChange(metricsContext: MetricsContext): Unit = {
+ //read jmxPrefix
+
+ MockMetricsReporter.JMXPREFIX.set(metricsContext.contextLabels().get("_namespace").toString)
+ MockMetricsReporter.CLUSTERID.set(metricsContext.contextLabels().get("kafka.cluster.id").toString)
+ MockMetricsReporter.BROKERID.set(metricsContext.contextLabels().get("kafka.broker.id").toString)
+ }
+
+ override def configure(configs: util.Map[String, _]): Unit = {}
+
+ }
+
+ object MockMetricsReporter {
+ val JMXPREFIX: AtomicReference[String] = new AtomicReference[String]
+ val BROKERID : AtomicReference[String] = new AtomicReference[String]
+ val CLUSTERID : AtomicReference[String] = new AtomicReference[String]
+ }
+}
+
+class KafkaMetricsReporterTest extends ZooKeeperTestHarness {
+ var server: KafkaServerStartable = null
+ var config: KafkaConfig = null
+
+ @Before
+ override def setUp(): Unit = {
+ super.setUp()
+ val props = TestUtils.createBrokerConfig(1, zkConnect)
+ props.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.server.KafkaMetricsReporterTest$MockMetricsReporter")
+ props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "true")
+ props.setProperty(KafkaConfig.BrokerIdProp, "-1")
+ config = KafkaConfig.fromProps(props)
+ server = KafkaServerStartable.fromProps(props, threadNamePrefix = Option(this.getClass.getName))
+ server.startup()
+ }
+
+ @Test
+ def testMetricsContextNamespacePresent(): Unit = {
+ assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID)
+ assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID)
+ assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX)
+ assertEquals("kafka.server", KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get())
+
+ server.shutdown()
+ TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+ }
+
+ @After
+ override def tearDown(): Unit = {
+ server.shutdown()
+ CoreUtils.delete(config.logDirs)
+ super.tearDown()
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 15b38eb..7a7e4df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -25,6 +26,8 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
@@ -678,10 +681,12 @@ public class KafkaStreams implements AutoCloseable {
final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class,
Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId));
- final JmxReporter jmxReporter = new JmxReporter(JMX_PREFIX);
+ final JmxReporter jmxReporter = new JmxReporter();
jmxReporter.configure(config.originals());
reporters.add(jmxReporter);
- metrics = new Metrics(metricConfig, reporters, time);
+ final MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
+ config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+ metrics = new Metrics(metricConfig, reporters, time, metricsContext);
streamsMetrics =
new StreamsMetricsImpl(metrics, clientId, config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG));
rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger(time);
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 9c7c01f..95942be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.serialization.Serdes;
@@ -172,7 +173,8 @@ public class KafkaStreamsTest {
PowerMock.expectNew(Metrics.class,
anyObject(MetricConfig.class),
capture(metricsReportersCapture),
- anyObject(Time.class)
+ anyObject(Time.class),
+ anyObject(MetricsContext.class)
).andAnswer(() -> {
for (final MetricsReporter reporter : metricsReportersCapture.getValue()) {
reporter.init(Collections.emptyList());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 49cba19..5df51e8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -28,8 +28,10 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.record.TimestampType;
@@ -575,7 +577,10 @@ public class StreamTaskTest {
testMetricsForBuiltInMetricsVersionLatest();
}
- final JmxReporter reporter = new JmxReporter("kafka.streams");
+ final JmxReporter reporter = new JmxReporter();
+ final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
+ reporter.contextChange(metricsContext);
+
metrics.addReporter(reporter);
final String threadIdTag =
StreamsConfig.METRICS_LATEST.equals(builtInMetricsVersion) ? THREAD_ID_TAG : THREAD_ID_TAG_0100_TO_24;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 1643e03..91c3797 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -36,8 +36,10 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
@@ -408,7 +410,10 @@ public class StreamThreadTest {
"commit-rate", taskGroupName, descriptionIsNotVerified, taskTags)));
}
- final JmxReporter reporter = new JmxReporter("kafka.streams");
+ final JmxReporter reporter = new JmxReporter();
+ final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
+ reporter.contextChange(metricsContext);
+
metrics.addReporter(reporter);
assertEquals(CLIENT_ID + "-StreamThread-1", thread.getName());
assertTrue(reporter.containsMbean(String.format("kafka.streams:type=%s,%s=%s",
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 9a2b9fd..4d666b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -19,7 +19,9 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
@@ -145,7 +147,10 @@ public class MeteredKeyValueStoreTest {
@Test
public void testMetrics() {
init();
- final JmxReporter reporter = new JmxReporter("kafka.streams");
+ final JmxReporter reporter = new JmxReporter();
+ final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
+ reporter.contextChange(metricsContext);
+
metrics.addReporter(reporter);
assertTrue(reporter.containsMbean(String.format(
"kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s",
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index d1f805c..7d558dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -20,7 +20,9 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
@@ -144,7 +146,10 @@ public class MeteredSessionStoreTest {
@Test
public void testMetrics() {
init();
- final JmxReporter reporter = new JmxReporter("kafka.streams");
+ final JmxReporter reporter = new JmxReporter();
+ final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
+ reporter.contextChange(metricsContext);
+
metrics.addReporter(reporter);
assertTrue(reporter.containsMbean(String.format(
"kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s",
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index b32c331..5522855 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -19,7 +19,9 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
@@ -147,7 +149,10 @@ public class MeteredTimestampedKeyValueStoreTest {
@Test
public void testMetrics() {
init();
- final JmxReporter reporter = new JmxReporter("kafka.streams");
+ final JmxReporter reporter = new JmxReporter();
+ final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
+ reporter.contextChange(metricsContext);
+
metrics.addReporter(reporter);
assertTrue(reporter.containsMbean(String.format(
"kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s",
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 569203e..2e1fb12 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -19,8 +19,10 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
@@ -141,7 +143,10 @@ public class MeteredWindowStoreTest {
public void testMetrics() {
replay(innerStoreMock);
store.init(context, store);
- final JmxReporter reporter = new JmxReporter("kafka.streams");
+ final JmxReporter reporter = new JmxReporter();
+ final MetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
+ reporter.contextChange(metricsContext);
+
metrics.addReporter(reporter);
assertTrue(reporter.containsMbean(String.format(
"kafka.streams:type=%s,%s=%s,task-id=%s,%s-state-id=%s",