You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/18 23:42:50 UTC

[GitHub] [kafka] xiaodongdu opened a new pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

xiaodongdu opened a new pull request #8691:
URL: https://github.com/apache/kafka/pull/8691


   Implement KIP-606, add metadata context to MetricsReporter:
   Added a new api to MetricsReporter to allow client to expose additional metadata fields to reporter plugin. Added an interface MetricsContext to encapsulate metadata.
   Deprecated JmexReporter(String prefix) constructor. The prefix will be passed to the reporter via MetricsContext.
   Replaced existing usage of JmxReporter with the default ImxReporter and pass JMX prefix to MetricsContext using _namespace as key.
   From Kafka broker, populate MetricsContext with: kafka.cluster.id and kafka.nroker.id
   From Connect, populate MetricsContext with: connect.kafka.cluster.id, connect.group.id
   
   Committer Checklist (excluded from commit message)
    Verify design and implementation
    Verify test coverage and CI build status
    Verify documentation (including upgrade notes)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429495993



##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   private var shutdownLatch = new CountDownLatch(1)
 
+  //properties for MetricsContext

Review comment:
       Moved these definition of kafka.cluster.id and kafka.broker.id to KafkaConfig




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634860494


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429495910



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
##########
@@ -318,4 +324,15 @@ public AttributeList setAttributes(AttributeList list) {
                                       + ".(whitelist/blacklist) is not a valid regular expression");
         }
     }
+
+    @Override
+    public void contextChange(MetricsContext metricsContext) {
+        Objects.requireNonNull(metricsContext.metadata().get(MetricsContext.NAMESPACE));

Review comment:
       Added a variable to save the value.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428203369



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##########
@@ -101,24 +101,25 @@ public Connect startConnect(Map<String, String> workerProps) {
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(config);
 
         ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
                 config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
                 config, ConnectorClientConfigOverridePolicy.class);
 
-        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
+        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy, kafkaClusterId);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
 
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);
         statusBackingStore.configure(config);
 
         ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
                 internalValueConverter,
                 config,
-                configTransformer);
+                configTransformer,
+                kafkaClusterId);

Review comment:
       Removed clusterId from back store constructor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634882608


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429683527



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
##########
@@ -63,6 +63,7 @@
 
     private static final String STATUS_TOPIC = "status-topic";
     private static final String WORKER_ID = "localhost:8083";
+    private static final String CLUSTER_ID = "cluster-1";

Review comment:
       Removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428202270



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
##########
@@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig distributedConfig = new DistributedConfig(workerProps);
         String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(distributedConfig);
-        Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
+        Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY, kafkaClusterId);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);
         statusBackingStore.configure(distributedConfig);

Review comment:
       Removed clusterId from KafkaStatusBackingStore constructor




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429683408



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
##########
@@ -65,4 +68,12 @@ static String lookupKafkaClusterId(Admin adminClient) {
                                        + "Check worker's broker connection and security properties.", e);
         }
     }
+
+    public static void addMetricsContextProperties(Map<String, Object> prop, WorkerConfig config, String clusterId) {
+        //add all properties predefined with "metrics.context."
+        prop.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX, false));
+        //add connect properties
+        prop.put(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, clusterId);
+        prop.put(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_GROUP_ID, config.originals().get(DistributedConfig.GROUP_ID_CONFIG));

Review comment:
       Added unit test in ConnectUtilsTest for StandAloneConfig and DistributedConfig.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429510931



##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   private var shutdownLatch = new CountDownLatch(1)
 
+  //properties for MetricsContext

Review comment:
       From his comments: "I think the ability to include "metrics.context.*" values as config is generally useful. Is it worth doing this for the broker as well?"
   Looks like I misunderstood what he meant. Just talked to him on slack. I'll revert that change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rnpridgeon commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rnpridgeon commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429370889



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A implementation of MetricsContext, it encapsulates required metrics context properties for Kafka services and clients
+ */
+public class KafkaMetricsContext implements MetricsContext {
+    /**
+     * Client or Service's metadata map.
+     */
+    private Map<String, String> metadata = new HashMap<>();
+
+    /**
+     * Create a MetricsContext with namespace, no service or client properties
+     * @param namespace value for _namespace key
+     */
+    public KafkaMetricsContext(String namespace) {
+        this(namespace, new HashMap<>());
+    }
+
+    /**
+     * Create a MetricsContext with namespace, service or client properties
+     * @param namespace value for _namespace key
+     * @param metadata  metadata additional entries to add to the context.
+     *                  values will be converted to string using Object.toString()
+     */
+    public KafkaMetricsContext(String namespace, Map<String, ?> metadata) {
+        this.metadata.put(MetricsContext.NAMESPACE, namespace);
+        metadata.forEach((key, value) -> this.metadata.put(key, value.toString()));

Review comment:
       Mostly I'm concerned about the case where some composite may share similar labels to the underlying client it manages. If we allow the downstream client to overwrite such a label we will lose a portion of the upstream components context. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634860813


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-635006306


   One seemingly unrelated failure on JDK11 with a known [flaky test](https://issues.apache.org/jira/browse/KAFKA-9516):
   ```
   kafka.api.PlaintextProducerSendTest.testNonBlockingProducer
       java.util.concurrent.TimeoutException: Timeout after waiting for 10000 ms.
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634695460


   @xiaodongdu please address the conflicts.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429495940



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
##########
@@ -71,8 +72,13 @@ public JmxReporter() {
 
     /**
      * Create a JMX reporter that prefixes all metrics with the given string.
+     *  @deprecated Since 2.6.0. Use {@link JmxReporter#JmxReporter()}
+     *  Initialize JmxReporter with {@link JmxReporter#contextChange(MetricsContext)}
+     *  Populate prefix by adding _namespace/prefix key value pair to {@link MetricsContext}
      */
+    @Deprecated
     public JmxReporter(String prefix) {
+        Objects.requireNonNull(prefix);

Review comment:
       Change the logic here: if null, set it to space.

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
##########
@@ -71,8 +72,13 @@ public JmxReporter() {
 
     /**
      * Create a JMX reporter that prefixes all metrics with the given string.
+     *  @deprecated Since 2.6.0. Use {@link JmxReporter#JmxReporter()}
+     *  Initialize JmxReporter with {@link JmxReporter#contextChange(MetricsContext)}
+     *  Populate prefix by adding _namespace/prefix key value pair to {@link MetricsContext}
      */
+    @Deprecated
     public JmxReporter(String prefix) {
+        Objects.requireNonNull(prefix);

Review comment:
       Change the logic here: if null, set it to empty string.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429683340



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
##########
@@ -223,4 +235,12 @@ private void stop(boolean swallowException) {
         else
             log.debug("The Connect group member has stopped.");
     }
+
+    /**
+     * Method for unit tests
+     * @return
+     */
+    public Metrics getMetrics() {
+        return this.metrics;
+    }

Review comment:
       Renamed this method to metrics() and make it protected.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r431194915



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
##########
@@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) throws ConfigExcept
     default void reconfigure(Map<String, ?> configs) {
     }
 
+    /**
+     * Provides context labels for the service or library exposing metrics
+     *
+     * @param metricsContext the metric context
+     */
+    @InterfaceStability.Evolving
+    default void contextChange(MetricsContext metricsContext) {

Review comment:
       If there is no clear call pattern, then it's fine to not say anything. However, `JmxReporter.contextChange(...)` seems to assume that `contextChange(...)` will be called before any metrics are added via `init(...)`.
   
   If that call pattern is true, then I think we should document it. If it's also true it can be called later, then mention this as well. For example, the JavaDoc text on `contextChange(...)` could be something like:
   
   > Sets the context labels for the service or library that is exposing metrics. 
   > This will be called before {@link #init(List)} and may be called anytime after that.
   
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rnpridgeon commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rnpridgeon commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428760555



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A implementation of MetricsContext, it encapsulates required metrics context properties for Kafka services and clients
+ */
+public class KafkaMetricsContext implements MetricsContext {
+    /**
+     * Client or Service's metadata map.
+     */
+    private Map<String, String> metadata = new HashMap<>();
+
+    /**
+     * Create a MetricsContext with namespace, no service or client properties
+     * @param namespace value for _namespace key
+     */
+    public KafkaMetricsContext(String namespace) {
+        this(namespace, new HashMap<>());
+    }
+
+    /**
+     * Create a MetricsContext with namespace, service or client properties
+     * @param namespace value for _namespace key
+     * @param metadata  metadata additional entries to add to the context.
+     *                  values will be converted to string using Object.toString()
+     */
+    public KafkaMetricsContext(String namespace, Map<String, ?> metadata) {
+        this.metadata.put(MetricsContext.NAMESPACE, namespace);
+        metadata.forEach((key, value) -> this.metadata.put(key, value.toString()));

Review comment:
       Use `putIfAbsent` to avoid silently overwriting over labels set upstream. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch closed pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch closed pull request #8691:
URL: https://github.com/apache/kafka/pull/8691


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r430590488



##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   private var shutdownLatch = new CountDownLatch(1)
 
+  //properties for MetricsContext

Review comment:
       @xvrl  Do you mean add this logic back to KafkaServer.scala:
   metadata.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX, false));
   

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -92,6 +93,7 @@
     private final ExecutorService executor;
     private final Time time;
     private final String workerId;
+    private final String clusterId;

Review comment:
       Changed name to kafkaClusterId

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional metadata about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The metadata map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {
+    /* predefined fields */
+    String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix
+
+    /**
+     * Returns metadata fields
+     */
+    Map<String, String> metadata();

Review comment:
       Updated code to reflect KIP changes

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   private var shutdownLatch = new CountDownLatch(1)
 
+  //properties for MetricsContext

Review comment:
       renamed jmxPrefix to metricsPrefix and add broker MetricsContext properties

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
##########
@@ -66,20 +67,22 @@ public void configure(final WorkerConfig config) {
         if (topic == null || topic.trim().length() == 0)
             throw new ConfigException("Offset storage topic must be specified");
 
+        String clusterId = ConnectUtils.lookupKafkaClusterId(config);
         data = new HashMap<>();
 
         Map<String, Object> originals = config.originals();
         Map<String, Object> producerProps = new HashMap<>(originals);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
+        ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
 
         Map<String, Object> consumerProps = new HashMap<>(originals);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);

Review comment:
       Nice catch, fixed.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ConnectUtils.class})
+@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
+public class WorkerGroupMemberTest {
+    @Mock
+    private ConfigBackingStore configBackingStore;
+    @Mock
+    private StatusBackingStore statusBackingStore;
+
+    @Test
+    public void testMetrics() throws Exception {
+        WorkerGroupMember member;
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");
+        workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        workerProps.put("group.id", "group-1");
+        workerProps.put("offset.storage.topic", "topic-1");
+        workerProps.put("config.storage.topic", "topic-1");
+        workerProps.put("status.storage.topic", "topic-1");
+        workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockConnectMetrics.MockMetricsReporter.class.getName());
+        DistributedConfig config = new DistributedConfig(workerProps);
+
+
+        LogContext logContext = new LogContext("[Worker clientId=client-1 + groupId= group-1]");
+
+        expectClusterId();
+
+        member = new WorkerGroupMember(config, "", configBackingStore,
+        null, Time.SYSTEM, "client-1", logContext);
+
+        for (MetricsReporter reporter : member.metrics().reporters()) {
+            if (reporter instanceof MockConnectMetrics.MockMetricsReporter) {
+                MockConnectMetrics.MockMetricsReporter mockMetricsReporter = (MockConnectMetrics.MockMetricsReporter) reporter;
+                assertEquals("cluster-1", mockMetricsReporter.getMetricsContext().metadata().get(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID));
+                assertEquals("group-1", mockMetricsReporter.getMetricsContext().metadata().get(WorkerConfig.CONNECT_GROUP_ID));
+            }
+        }

Review comment:
       Added verification.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ConnectUtils.class})
+@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
+public class WorkerGroupMemberTest {
+    @Mock
+    private ConfigBackingStore configBackingStore;
+    @Mock
+    private StatusBackingStore statusBackingStore;
+
+    @Test
+    public void testMetrics() throws Exception {
+        WorkerGroupMember member;
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");

Review comment:
       Removed deprecated configs.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
##########
@@ -63,7 +67,7 @@
      * @param config   the worker configuration; may not be null
      * @param time     the time; may not be null
      */
-    public ConnectMetrics(String workerId, WorkerConfig config, Time time) {
+    public ConnectMetrics(String workerId, WorkerConfig config, Time time, String clusterId) {

Review comment:
       Updated javadoc.

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -384,6 +390,23 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
     clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
   }
 
+  private[server] def notifyMetricsReporters(metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext()
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+
+  private[server] def createKafkaMetricsContext() : KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX, false))

Review comment:
       Fixed

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A implementation of MetricsContext, it encapsulates required metrics context properties for Kafka services and clients
+ */
+public class KafkaMetricsContext implements MetricsContext {
+    /**
+     * Client or Service's contextLabels map.
+     */
+    private final Map<String, String> contextLabels = new HashMap<>();
+
+    /**
+     * Create a MetricsContext with namespace, no service or client properties
+     * @param namespace value for _namespace key
+     */
+    public KafkaMetricsContext(String namespace) {
+        this(namespace, new HashMap<>());
+    }
+
+    /**
+     * Create a MetricsContext with namespace, service or client properties
+     * @param namespace value for _namespace key
+     * @param contextLabels  contextLabels additional entries to add to the context.
+     *                  values will be converted to string using Object.toString()
+     */
+    public KafkaMetricsContext(String namespace, Map<String, ?> contextLabels) {
+        this.contextLabels.put(MetricsContext.NAMESPACE, namespace);
+        contextLabels.forEach((key, value) -> this.contextLabels.put(key, value.toString()));
+    }
+
+    public Map<String, String> contextLabels() {

Review comment:
       Added annotation

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional contextLabels about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The contextLabels map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {
+    /* predefined fields */
+    String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix
+
+    /**
+     * Returns contextLabels fields
+     */
+    Map<String, String> contextLabels();

Review comment:
       Updated javadoc.

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
##########
@@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) throws ConfigExcept
     default void reconfigure(Map<String, ?> configs) {
     }
 
+    /**
+     * Provides context labels for the service or library exposing metrics
+     *
+     * @param metricsContext the metric context
+     */
+    @InterfaceStability.Evolving
+    default void contextChange(MetricsContext metricsContext) {

Review comment:
       It can be called multiple times.  Not sure if we should mention that in Javadoc, other methods in this class we are not mention if it can be called multiple times even though they can be called multiple times. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634339348


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428203600



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
##########
@@ -93,7 +93,7 @@ public static void main(String[] args) {
                 config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
                 config, ConnectorClientConfigOverridePolicy.class);
             Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore(),
-                                       connectorClientConfigOverridePolicy);
+                                       connectorClientConfigOverridePolicy, kafkaClusterId);

Review comment:
       Removed clusterId from Worker constructor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429510931



##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   private var shutdownLatch = new CountDownLatch(1)
 
+  //properties for MetricsContext

Review comment:
       From his comments: "I think the ability to include "metrics.context.*" values as config is generally useful. Is it worth doing this for the broker as well?"
   Looks like I misunderstood what he meant. Just talked to him on slack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xvrl commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428351889



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -270,9 +272,15 @@ Duration adminTimeout() {
     List<MetricsReporter> metricsReporters() {
         List<MetricsReporter> reporters = getConfiguredInstances(
                 CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
-        JmxReporter jmxReporter = new JmxReporter("kafka.connect.mirror");
+        JmxReporter jmxReporter = new JmxReporter();
         jmxReporter.configure(this.originals());
         reporters.add(jmxReporter);
+        MetricsContext metricsContext = new KafkaMetricsContext("kafka.connect.mirror");

Review comment:
       @rhauch we mentioned connect more generally in the KIP, but I can clarify to make it explicit




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634860234


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-631035724


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rnpridgeon commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rnpridgeon commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429216813



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
##########
@@ -16,19 +16,24 @@
  */
 package org.apache.kafka.connect.util;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.InvalidRecordException;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 public final class ConnectUtils {
     private static final Logger log = LoggerFactory.getLogger(ConnectUtils.class);
+    public static final String CONNECT_KAFKA_CLUSTER_ID = "connect.kafka.cluster.id";
+    public static final String CONNECT_GROUP_ID = "connect.group.id";

Review comment:
       Maybe it would be helpful to add CONNECT_VERSION as well. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xvrl commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429442787



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional metadata about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The metadata map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {
+    /* predefined fields */
+    String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix
+
+    /**
+     * Returns metadata fields
+     */
+    Map<String, String> metadata();

Review comment:
       any suggestions? maybe `contextLabels`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch merged pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch merged pull request #8691:
URL: https://github.com/apache/kafka/pull/8691


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428202532



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##########
@@ -101,24 +101,25 @@ public Connect startConnect(Map<String, String> workerProps) {
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);

Review comment:
       Removed clusterId from KafkaOffsetBackingStore constructor
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xvrl commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429440655



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional metadata about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The metadata map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {

Review comment:
       I think having an interface gives us more flexibility to evolve the API, without breaking backwards compatibility.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634884341






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634882817


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428203009



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##########
@@ -101,24 +101,25 @@ public Connect startConnect(Map<String, String> workerProps) {
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(config);
 
         ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
                 config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
                 config, ConnectorClientConfigOverridePolicy.class);
 
-        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
+        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy, kafkaClusterId);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
 
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);

Review comment:
       Removed clusterId from KafkaStatusBackingStore constructor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428255598



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
##########
@@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig distributedConfig = new DistributedConfig(workerProps);
         String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(distributedConfig);

Review comment:
       Get kafkaClusterId from ConnectUtils.lookupKafkaClusterId in configure(...) of KafkaOffsetBackingStore




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634917119


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634863654


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634836169


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634863453


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429495778



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A implementation of MetricsContext, it encapsulates required metrics context properties for Kafka services and clients
+ */
+public class KafkaMetricsContext implements MetricsContext {
+    /**
+     * Client or Service's metadata map.
+     */
+    private Map<String, String> metadata = new HashMap<>();

Review comment:
       > Thanks for the patch @xiaodongdu!
   > 
   > I reviewed the client and broker sections of code as well as the new interface. Comments inline.
   > 
   > I think the ability to include "metrics.context.*" values as config is generally useful. Is it worth doing this for the broker as well?
   > 
   > Also, in case you didn't know, you can use [Markdown](https://guides.github.com/features/mastering-markdown/) in the PR description to format it and make it a little more readable.
   
   Moved String constants into KafkaConfig.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-630496674


   @xvrl  @rhauch @ijuma  Could you review this PR for KIP-606. Thanks,


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428202742



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##########
@@ -101,24 +101,25 @@ public Connect startConnect(Map<String, String> workerProps) {
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(config);
 
         ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
                 config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
                 config, ConnectorClientConfigOverridePolicy.class);
 
-        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
+        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy, kafkaClusterId);

Review comment:
       Removed clusterId from Worker constructor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xvrl commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429503342



##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   private var shutdownLatch = new CountDownLatch(1)
 
+  //properties for MetricsContext

Review comment:
       any reason you moved some but not jmxPrefix? I think it would make sense to keep all the metrics context constants in one place. I didn't see any comment from @mumrah suggesting to move them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xvrl commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429442671



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional metadata about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The metadata map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {
+    /* predefined fields */
+    String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix

Review comment:
       I think the interface is a natural way to expose predefined constants an API might need. I don't see a need to have a separate class for this yet.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch removed a comment on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch removed a comment on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634861096






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r431267044



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
##########
@@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) throws ConfigExcept
     default void reconfigure(Map<String, ?> configs) {
     }
 
+    /**
+     * Provides context labels for the service or library exposing metrics
+     *
+     * @param metricsContext the metric context
+     */
+    @InterfaceStability.Evolving
+    default void contextChange(MetricsContext metricsContext) {

Review comment:
       Sounds good. Updated javadoc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r426971298



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -270,9 +272,15 @@ Duration adminTimeout() {
     List<MetricsReporter> metricsReporters() {
         List<MetricsReporter> reporters = getConfiguredInstances(
                 CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
-        JmxReporter jmxReporter = new JmxReporter("kafka.connect.mirror");
+        JmxReporter jmxReporter = new JmxReporter();
         jmxReporter.configure(this.originals());
         reporters.add(jmxReporter);
+        MetricsContext metricsContext = new KafkaMetricsContext("kafka.connect.mirror");

Review comment:
       I don't think the KIP mentioned this `kafka.connect.mirror` metrics context. It's probably worthwhile to update the KIP and then notify the vote thread of the minor change noticed during implementation.

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
##########
@@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig distributedConfig = new DistributedConfig(workerProps);
         String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(distributedConfig);

Review comment:
       Do we need to modify the `KafkaOffsetBackingStore()` constructor? The `ConnectUtils.lookupKafkaClusterId(...)` can be called with the `WorkerConfig` (which is the parent class of `DistributedConfig`) passed to it via the `configure(...)` method, so couldn't the `configure(...)` method call the lookup method?
   
   This may seem minor, but it follows the existing pattern for this class.

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
##########
@@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig distributedConfig = new DistributedConfig(workerProps);
         String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(distributedConfig);
-        Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
+        Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY, kafkaClusterId);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);
         statusBackingStore.configure(distributedConfig);

Review comment:
       Could the `KafkaStatusBackingStore(...)` get the cluster ID from the `distributedConfig` passed into the `configure(...)` method, similar to the `KafkaOffsetBackingStore`?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##########
@@ -101,24 +101,25 @@ public Connect startConnect(Map<String, String> workerProps) {
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(config);
 
         ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
                 config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
                 config, ConnectorClientConfigOverridePolicy.class);
 
-        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
+        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy, kafkaClusterId);

Review comment:
       Same comment here as in `MirrorMaker`: the worker's constructor could get the Kafka cluster ID directly from the worker config.

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
##########
@@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig distributedConfig = new DistributedConfig(workerProps);
         String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(distributedConfig);
-        Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
+        Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY, kafkaClusterId);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);
         statusBackingStore.configure(distributedConfig);
         ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
                 internalValueConverter,
                 distributedConfig,
-                configTransformer);
+                configTransformer,
+                kafkaClusterId);

Review comment:
       Could the `KafkaConfigBackingStore(...)` get the cluster ID from the `distributedConfig`? One of the reasons why we pass the whole worker config to the constructor is so that we don't have to always modify the constructor to pass in additional information that can be derived from the worker config.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##########
@@ -101,24 +101,25 @@ public Connect startConnect(Map<String, String> workerProps) {
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(config);
 
         ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
                 config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
                 config, ConnectorClientConfigOverridePolicy.class);
 
-        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
+        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy, kafkaClusterId);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
 
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);

Review comment:
       Same comment here as in `MirrorMaker`: the backing store's constructor could get the Kafka cluster ID directly from the worker config.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -92,6 +93,7 @@
     private final ExecutorService executor;
     private final Time time;
     private final String workerId;
+    private final String clusterId;

Review comment:
       The Kafka cluster ID is passed into the constructor, but is this supposed to represent the Connect cluster ID or the Kafka cluster ID? Since this is in Connect code, without a context we'd assume it was the Connect cluster ID.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##########
@@ -101,24 +101,25 @@ public Connect startConnect(Map<String, String> workerProps) {
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(config);
 
         ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
                 config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
                 config, ConnectorClientConfigOverridePolicy.class);
 
-        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
+        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy, kafkaClusterId);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
 
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);
         statusBackingStore.configure(config);
 
         ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
                 internalValueConverter,
                 config,
-                configTransformer);
+                configTransformer,
+                kafkaClusterId);

Review comment:
       Same comment here as in `MirrorMaker`: the backing store's constructor could get the Kafka cluster ID directly from the worker config.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
##########
@@ -93,7 +93,7 @@ public static void main(String[] args) {
                 config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
                 config, ConnectorClientConfigOverridePolicy.class);
             Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore(),
-                                       connectorClientConfigOverridePolicy);
+                                       connectorClientConfigOverridePolicy, kafkaClusterId);

Review comment:
       Same comment here as in `MirrorMaker`: the worker's constructor could get the Kafka cluster ID directly from the worker config.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##########
@@ -101,24 +101,25 @@ public Connect startConnect(Map<String, String> workerProps) {
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);

Review comment:
       Same comment here as in `MirrorMaker`: the backing store's `configure(...)` method could get the Kafka cluster ID directly from the worker config.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xvrl commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428352951



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -270,9 +272,15 @@ Duration adminTimeout() {
     List<MetricsReporter> metricsReporters() {
         List<MetricsReporter> reporters = getConfiguredInstances(
                 CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
-        JmxReporter jmxReporter = new JmxReporter("kafka.connect.mirror");
+        JmxReporter jmxReporter = new JmxReporter();
         jmxReporter.configure(this.originals());
         reporters.add(jmxReporter);
+        MetricsContext metricsContext = new KafkaMetricsContext("kafka.connect.mirror");

Review comment:
       the KIP mentions that we are deprecating the jmx prefix directly on the JmxReporter, and instead are passing it via the metrics context as the `_namespace` parameter. This doesn't change the prefix or how they are exposed in jmx. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r430482879



##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   private var shutdownLatch = new CountDownLatch(1)
 
+  //properties for MetricsContext

Review comment:
       This comment was just about the `jmxPrefix` variable. I realize it's pre-existing code, but now that it's not specific to JMX, I suggested renaming it. 

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional metadata about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The metadata map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {
+    /* predefined fields */
+    String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix
+
+    /**
+     * Returns metadata fields
+     */
+    Map<String, String> metadata();

Review comment:
       That sounds good

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional metadata about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The metadata map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {

Review comment:
       I wasn't suggesting that we eliminate the interface, I definitely think having one is a good choice (for the reasons you mentioned). What I meant was in this interface, we expose a Map as the collection of metrics tags/labels. But since it appears that the usage is intended to be read-only, maybe a Map isn't the best choice. Here's what I was thinking:
   
   ```java
   interface MetricsContext {
     String namespace();
     String get(String field);
     Collection<String> fields();
   }
   ```
   
   (included the namespace suggestion from my other comment as well).

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional metadata about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The metadata map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {
+    /* predefined fields */
+    String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix

Review comment:
       Sorry, I shouldn't have said "field" since that implies a concrete class. See above comment above for an example of what I meant.

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
##########
@@ -71,8 +72,13 @@ public JmxReporter() {
 
     /**
      * Create a JMX reporter that prefixes all metrics with the given string.
+     *  @deprecated Since 2.6.0. Use {@link JmxReporter#JmxReporter()}
+     *  Initialize JmxReporter with {@link JmxReporter#contextChange(MetricsContext)}
+     *  Populate prefix by adding _namespace/prefix key value pair to {@link MetricsContext}
      */
+    @Deprecated

Review comment:
       Sounds good, thanks 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429683348



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java
##########
@@ -59,6 +59,7 @@
     private static final String FOO_TOPIC = "foo-topic";
     private static final String FOO_CONNECTOR = "foo-source";
     private static final String BAR_TOPIC = "bar-topic";
+    private static final String CLUSTER_ID = "cluster-1";

Review comment:
       Removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xvrl commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429366836



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A implementation of MetricsContext, it encapsulates required metrics context properties for Kafka services and clients
+ */
+public class KafkaMetricsContext implements MetricsContext {
+    /**
+     * Client or Service's metadata map.
+     */
+    private Map<String, String> metadata = new HashMap<>();
+
+    /**
+     * Create a MetricsContext with namespace, no service or client properties
+     * @param namespace value for _namespace key
+     */
+    public KafkaMetricsContext(String namespace) {
+        this(namespace, new HashMap<>());
+    }
+
+    /**
+     * Create a MetricsContext with namespace, service or client properties
+     * @param namespace value for _namespace key
+     * @param metadata  metadata additional entries to add to the context.
+     *                  values will be converted to string using Object.toString()
+     */
+    public KafkaMetricsContext(String namespace, Map<String, ?> metadata) {
+        this.metadata.put(MetricsContext.NAMESPACE, namespace);
+        metadata.forEach((key, value) -> this.metadata.put(key, value.toString()));

Review comment:
       I think we it's ok for the component that owns the reporter to take precedence over the labels passed from upstream. We did not specify the behavior in the KIP, so implementations should use namespacing of labels to avoid this. If in practice we find this behavior is less desirable, we can file a follow-on KIP, since the interface is till evolving.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429387498



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
##########
@@ -318,4 +324,15 @@ public AttributeList setAttributes(AttributeList list) {
                                       + ".(whitelist/blacklist) is not a valid regular expression");
         }
     }
+
+    @Override
+    public void contextChange(MetricsContext metricsContext) {
+        Objects.requireNonNull(metricsContext.metadata().get(MetricsContext.NAMESPACE));

Review comment:
       nit: just read the value from the Map once

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A implementation of MetricsContext, it encapsulates required metrics context properties for Kafka services and clients
+ */
+public class KafkaMetricsContext implements MetricsContext {
+    /**
+     * Client or Service's metadata map.
+     */
+    private Map<String, String> metadata = new HashMap<>();

Review comment:
       Can be `final`

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
##########
@@ -71,8 +72,13 @@ public JmxReporter() {
 
     /**
      * Create a JMX reporter that prefixes all metrics with the given string.
+     *  @deprecated Since 2.6.0. Use {@link JmxReporter#JmxReporter()}
+     *  Initialize JmxReporter with {@link JmxReporter#contextChange(MetricsContext)}
+     *  Populate prefix by adding _namespace/prefix key value pair to {@link MetricsContext}
      */
+    @Deprecated
     public JmxReporter(String prefix) {
+        Objects.requireNonNull(prefix);

Review comment:
       Should we throw an exception here, or just replace the null value with an empty string?

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional metadata about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The metadata map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {
+    /* predefined fields */
+    String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix
+
+    /**
+     * Returns metadata fields
+     */
+    Map<String, String> metadata();

Review comment:
       Metadata is very overloaded, can we think of a different name here?

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional metadata about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The metadata map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {

Review comment:
       Looking through the PR, it seems that MetricsContext is a short lived object used to pass values to the MetricReporters as they are constructed. Since the usage appears to be write-once, it might be better to expose a subset of Map rather than the full thing. E.g., `String get(String field)` and `Iterator<String> fields` or something.
   
   If we think this might evolve into a mutable long-lived object, then a Map is probably better. Just a thought.
   
   
    

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional metadata about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The metadata map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {
+    /* predefined fields */
+    String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix

Review comment:
       Should we define this as a field on the interface? 

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
##########
@@ -71,8 +72,13 @@ public JmxReporter() {
 
     /**
      * Create a JMX reporter that prefixes all metrics with the given string.
+     *  @deprecated Since 2.6.0. Use {@link JmxReporter#JmxReporter()}
+     *  Initialize JmxReporter with {@link JmxReporter#contextChange(MetricsContext)}
+     *  Populate prefix by adding _namespace/prefix key value pair to {@link MetricsContext}
      */
+    @Deprecated

Review comment:
       @xvrl can we remove the explicit construction of JmxReporter and rely on the plugin loading mechanism after we remove this non-zero-arg constructor? Maybe something for 3.0?

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   private var shutdownLatch = new CountDownLatch(1)
 
+  //properties for MetricsContext

Review comment:
       Should we rename jmxPrefix to metricsPrefix?

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
##########
@@ -65,4 +66,13 @@ default void validateReconfiguration(Map<String, ?> configs) throws ConfigExcept
     default void reconfigure(Map<String, ?> configs) {
     }
 
+    /**
+     * Callback method providing context metadata for the

Review comment:
       Not really a "callback" in the conventional sense. Maybe just "Provides context metadata for the ..."




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428205238



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -92,6 +93,7 @@
     private final ExecutorService executor;
     private final Time time;
     private final String workerId;
+    private final String clusterId;

Review comment:
       I'm still keeping clusterId as a class variable in Worker, since we are using clusterId in other method of this class. We are getting clusterId from ConnectUtils.lookupKafkaClusterId inside Worker constructor and keep it as a class variable value.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rnpridgeon commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rnpridgeon commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-631074323


   Since MetricsReporter instances are the ultimate consumers of the MetadataContext instance I would like to propose moving construction MetricsContext there. This would allow each reporter to manage its own set of labels increasing encapsulation and extensibility. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429683348



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java
##########
@@ -59,6 +59,7 @@
     private static final String FOO_TOPIC = "foo-topic";
     private static final String FOO_CONNECTOR = "foo-source";
     private static final String BAR_TOPIC = "bar-topic";
+    private static final String CLUSTER_ID = "cluster-1";

Review comment:
       removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429444451



##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   private var shutdownLatch = new CountDownLatch(1)
 
+  //properties for MetricsContext

Review comment:
       jmxPrefix is from existing codebase: https://github.com/apache/kafka/blob/ec205171e5fddbae2ebc1784adf73a6732889a49/core/src/main/scala/kafka/server/KafkaServer.scala#L132
   
   We can move the comments beneath it so it is not confusing




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429495993



##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   private var shutdownLatch = new CountDownLatch(1)
 
+  //properties for MetricsContext

Review comment:
       Moved these definition to KafkaConfig




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429683475



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
##########
@@ -65,4 +68,12 @@ static String lookupKafkaClusterId(Admin adminClient) {
                                        + "Check worker's broker connection and security properties.", e);
         }
     }
+
+    public static void addMetricsContextProperties(Map<String, Object> prop, WorkerConfig config, String clusterId) {
+        //add all properties predefined with "metrics.context."
+        prop.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX, false));
+        //add connect properties
+        prop.put(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, clusterId);
+        prop.put(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_GROUP_ID, config.originals().get(DistributedConfig.GROUP_ID_CONFIG));

Review comment:
       I'll leave KIP update to @xvrl 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634861096


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634898904


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xvrl commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429432997



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A implementation of MetricsContext, it encapsulates required metrics context properties for Kafka services and clients
+ */
+public class KafkaMetricsContext implements MetricsContext {
+    /**
+     * Client or Service's metadata map.
+     */
+    private Map<String, String> metadata = new HashMap<>();
+
+    /**
+     * Create a MetricsContext with namespace, no service or client properties
+     * @param namespace value for _namespace key
+     */
+    public KafkaMetricsContext(String namespace) {
+        this(namespace, new HashMap<>());
+    }
+
+    /**
+     * Create a MetricsContext with namespace, service or client properties
+     * @param namespace value for _namespace key
+     * @param metadata  metadata additional entries to add to the context.
+     *                  values will be converted to string using Object.toString()
+     */
+    public KafkaMetricsContext(String namespace, Map<String, ?> metadata) {
+        this.metadata.put(MetricsContext.NAMESPACE, namespace);
+        metadata.forEach((key, value) -> this.metadata.put(key, value.toString()));

Review comment:
       the client currently only injects the labels passed in via client properties, so that wouldn't happen




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428202016



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
##########
@@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig distributedConfig = new DistributedConfig(workerProps);
         String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(distributedConfig);
-        Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
+        Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY, kafkaClusterId);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);
         statusBackingStore.configure(distributedConfig);
         ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
                 internalValueConverter,
                 distributedConfig,
-                configTransformer);
+                configTransformer,
+                kafkaClusterId);

Review comment:
       Removed clusterId from KafkaConfigBackingStore constructor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429476820



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
##########
@@ -16,19 +16,24 @@
  */
 package org.apache.kafka.connect.util;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.InvalidRecordException;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 public final class ConnectUtils {
     private static final Logger log = LoggerFactory.getLogger(ConnectUtils.class);
+    public static final String CONNECT_KAFKA_CLUSTER_ID = "connect.kafka.cluster.id";
+    public static final String CONNECT_GROUP_ID = "connect.group.id";

Review comment:
       Version is added in ce-kafka.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xvrl commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r430557121



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional metadata about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The metadata map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {

Review comment:
       I think keeping a Map interface makes it more convenient to work with. It gives you all the helper methods and streaming interfaces rather than having to hand-roll those things for someone consuming the api.

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional metadata about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The metadata map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {
+    /* predefined fields */
+    String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix

Review comment:
       I'm not convinced we should give namespace a special status over other fields, it just happens to be the only one we currently define by default for backwards compatibility reasons. If we find ourselves adding more of those, I agreee it would be worth revisiting how we expose pre-defined fields.

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   private var shutdownLatch = new CountDownLatch(1)
 
+  //properties for MetricsContext

Review comment:
       @xiaodongdu what @mumrah was saying is that we could add the properties to the kafka broker configuration and pass those values into the context, similar to what we do for client. We can update the KIP and add that.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
##########
@@ -65,4 +68,12 @@ static String lookupKafkaClusterId(Admin adminClient) {
                                        + "Check worker's broker connection and security properties.", e);
         }
     }
+
+    public static void addMetricsContextProperties(Map<String, Object> prop, WorkerConfig config, String clusterId) {
+        //add all properties predefined with "metrics.context."
+        prop.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX, false));
+        //add connect properties
+        prop.put(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, clusterId);
+        prop.put(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_GROUP_ID, config.originals().get(DistributedConfig.GROUP_ID_CONFIG));

Review comment:
       @rhauch I've updated the KIP document, will send a follow-up email to announce the changes, once we finalize @mumrah 's feedback

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -129,7 +129,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   private var shutdownLatch = new CountDownLatch(1)
 
+  //properties for MetricsContext

Review comment:
       yes, I believe that's what @mumrah meant. If we decide to do this, I'll update the KIP and send a follow-up email with some of the other tweaks we discussed in this PR. 

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional metadata about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The metadata map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {
+    /* predefined fields */
+    String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix
+
+    /**
+     * Returns metadata fields
+     */
+    Map<String, String> metadata();

Review comment:
       ok, I updated the KIP to reflect this change.

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -384,6 +390,23 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
     clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
   }
 
+  private[server] def notifyMetricsReporters(metricsReporters: Seq[AnyRef]): Unit = {
+    val metricsContext = createKafkaMetricsContext()
+    metricsReporters.foreach {
+      case x: MetricsReporter => x.contextChange(metricsContext)
+      case _ => //do nothing
+    }
+  }
+
+  private[server] def createKafkaMetricsContext() : KafkaMetricsContext = {
+    val contextLabels = new util.HashMap[String, Object]
+    contextLabels.put(KAFKA_CLUSTER_ID, clusterId)
+    contextLabels.put(KAFKA_BROKER_ID, config.brokerId.toString)
+    contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX, false))

Review comment:
       the prefix should be stripped before adding the fields to the context.

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
##########
@@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) throws ConfigExcept
     default void reconfigure(Map<String, ?> configs) {
     }
 
+    /**
+     * Provides context labels for the service or library exposing metrics
+     *
+     * @param metricsContext the metric context
+     */
+    @InterfaceStability.Evolving
+    default void contextChange(MetricsContext metricsContext) {

Review comment:
       someone on the ML commented that we might want to name this `contextChanged` (past tense). I don't have a strong feeling either way. Do you have any thoughts @mumrah @rhauch?

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
##########
@@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) throws ConfigExcept
     default void reconfigure(Map<String, ?> configs) {
     }
 
+    /**
+     * Provides context labels for the service or library exposing metrics
+     *
+     * @param metricsContext the metric context
+     */
+    @InterfaceStability.Evolving
+    default void contextChange(MetricsContext metricsContext) {

Review comment:
       due to the way jmxreporter is initialized in Kafka today, it already gets called both before and after `init()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429647155



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java
##########
@@ -59,6 +59,7 @@
     private static final String FOO_TOPIC = "foo-topic";
     private static final String FOO_CONNECTOR = "foo-source";
     private static final String BAR_TOPIC = "bar-topic";
+    private static final String CLUSTER_ID = "cluster-1";

Review comment:
       This is unused.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ConnectUtils.class})
+@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
+public class WorkerGroupMemberTest {
+    @Mock
+    private ConfigBackingStore configBackingStore;
+    @Mock
+    private StatusBackingStore statusBackingStore;
+
+    @Test
+    public void testMetrics() throws Exception {
+        WorkerGroupMember member;
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");
+        workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        workerProps.put("group.id", "group-1");
+        workerProps.put("offset.storage.topic", "topic-1");
+        workerProps.put("config.storage.topic", "topic-1");
+        workerProps.put("status.storage.topic", "topic-1");
+        workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockConnectMetrics.MockMetricsReporter.class.getName());
+        DistributedConfig config = new DistributedConfig(workerProps);
+
+
+        LogContext logContext = new LogContext("[Worker clientId=client-1 + groupId= group-1]");
+
+        expectClusterId();
+
+        member = new WorkerGroupMember(config, "", configBackingStore,
+        null, Time.SYSTEM, "client-1", logContext);
+
+        for (MetricsReporter reporter : member.getMetrics().reporters()) {
+            if (reporter instanceof MockConnectMetrics.MockMetricsReporter) {
+                MockConnectMetrics.MockMetricsReporter mockMetricsReporter = (MockConnectMetrics.MockMetricsReporter) reporter;
+                assertEquals("cluster-1", mockMetricsReporter.getMetricsContext().metadata().get(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID));
+                assertEquals("group-1", mockMetricsReporter.getMetricsContext().metadata().get(WorkerConfig.CONNECT_GROUP_ID));
+            }
+        }
+
+        MetricName name = member.getMetrics().metricName("test.avg", "grp1");
+        member.getMetrics().addMetric(name, new Avg());

Review comment:
       Per a previous suggestion:
   ```suggestion
           member.metrics().addMetric(name, new Avg());
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
##########
@@ -65,4 +68,12 @@ static String lookupKafkaClusterId(Admin adminClient) {
                                        + "Check worker's broker connection and security properties.", e);
         }
     }
+
+    public static void addMetricsContextProperties(Map<String, Object> prop, WorkerConfig config, String clusterId) {
+        //add all properties predefined with "metrics.context."
+        prop.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX, false));
+        //add connect properties
+        prop.put(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, clusterId);
+        prop.put(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_GROUP_ID, config.originals().get(DistributedConfig.GROUP_ID_CONFIG));

Review comment:
       Speaking of this, it'd probably be worth adding unit tests for this method with both `StandaloneConfig` and `DistributedConfig` objects to verify this method adds the right properties to the supplied map.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ConnectUtils.class})
+@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
+public class WorkerGroupMemberTest {
+    @Mock
+    private ConfigBackingStore configBackingStore;
+    @Mock
+    private StatusBackingStore statusBackingStore;
+
+    @Test
+    public void testMetrics() throws Exception {
+        WorkerGroupMember member;
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");
+        workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        workerProps.put("group.id", "group-1");
+        workerProps.put("offset.storage.topic", "topic-1");
+        workerProps.put("config.storage.topic", "topic-1");
+        workerProps.put("status.storage.topic", "topic-1");
+        workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockConnectMetrics.MockMetricsReporter.class.getName());
+        DistributedConfig config = new DistributedConfig(workerProps);
+
+
+        LogContext logContext = new LogContext("[Worker clientId=client-1 + groupId= group-1]");
+
+        expectClusterId();
+
+        member = new WorkerGroupMember(config, "", configBackingStore,
+        null, Time.SYSTEM, "client-1", logContext);
+
+        for (MetricsReporter reporter : member.getMetrics().reporters()) {
+            if (reporter instanceof MockConnectMetrics.MockMetricsReporter) {
+                MockConnectMetrics.MockMetricsReporter mockMetricsReporter = (MockConnectMetrics.MockMetricsReporter) reporter;
+                assertEquals("cluster-1", mockMetricsReporter.getMetricsContext().metadata().get(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID));
+                assertEquals("group-1", mockMetricsReporter.getMetricsContext().metadata().get(WorkerConfig.CONNECT_GROUP_ID));
+            }
+        }
+
+        MetricName name = member.getMetrics().metricName("test.avg", "grp1");

Review comment:
       Per a previous suggestion:
   ```suggestion
           MetricName name = member.metrics().metricName("test.avg", "grp1");
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
##########
@@ -153,6 +164,7 @@ public void setup() {
         workerProps.put("config.providers.file.class", MockFileConfigProvider.class.getName());
         mockFileProviderTestId = UUID.randomUUID().toString();
         workerProps.put("config.providers.file.param.testId", mockFileProviderTestId);
+        workerProps.put("group.id", GROUP_ID);

Review comment:
       This `group.id` property is not defined in `StandaloneConfig`. See my earlier comment.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
##########
@@ -223,4 +235,12 @@ private void stop(boolean swallowException) {
         else
             log.debug("The Connect group member has stopped.");
     }
+
+    /**
+     * Method for unit tests
+     * @return
+     */
+    public Metrics getMetrics() {
+        return this.metrics;
+    }

Review comment:
       Nit: this doesn't follow our conventions: we don't use the `get*` style getters, and if this is only used in unit tests we should make this package protected instead this should be:
   ```suggestion
       // Visible for testing
       Metrics metrics() {
           return this.metrics;
       }
   ```
   You'll have to change the `WorkerGroupMemberTest` accordingly.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ConnectUtils.class})
+@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
+public class WorkerGroupMemberTest {
+    @Mock
+    private ConfigBackingStore configBackingStore;
+    @Mock
+    private StatusBackingStore statusBackingStore;
+
+    @Test
+    public void testMetrics() throws Exception {
+        WorkerGroupMember member;
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");
+        workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        workerProps.put("group.id", "group-1");
+        workerProps.put("offset.storage.topic", "topic-1");
+        workerProps.put("config.storage.topic", "topic-1");
+        workerProps.put("status.storage.topic", "topic-1");
+        workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockConnectMetrics.MockMetricsReporter.class.getName());
+        DistributedConfig config = new DistributedConfig(workerProps);
+
+
+        LogContext logContext = new LogContext("[Worker clientId=client-1 + groupId= group-1]");
+
+        expectClusterId();
+
+        member = new WorkerGroupMember(config, "", configBackingStore,
+        null, Time.SYSTEM, "client-1", logContext);
+
+        for (MetricsReporter reporter : member.getMetrics().reporters()) {

Review comment:
       Per a previous suggestion:
   ```suggestion
           for (MetricsReporter reporter : member.metrics().reporters()) {
   ```
   

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
##########
@@ -65,4 +68,12 @@ static String lookupKafkaClusterId(Admin adminClient) {
                                        + "Check worker's broker connection and security properties.", e);
         }
     }
+
+    public static void addMetricsContextProperties(Map<String, Object> prop, WorkerConfig config, String clusterId) {
+        //add all properties predefined with "metrics.context."
+        prop.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX, false));
+        //add connect properties
+        prop.put(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, clusterId);
+        prop.put(CommonClientConfigs.METRICS_CONTEXT_PREFIX + WorkerConfig.CONNECT_GROUP_ID, config.originals().get(DistributedConfig.GROUP_ID_CONFIG));

Review comment:
       Standalone does not have a `group.id` property, since each standalone worker is its own single-node cluster. That means that this line can result in a null value for the `metrics.context.connect.group.id` entry in the `props` map.
   
   Unfortunately, there is no identifier for standalone workers. Is the `metrics.context.connect.group.id` property really required if the Standalone worker doesn't use any kind of coordination? It seems from [other changes in `ConnectMetrics`](https://github.com/apache/kafka/pull/8691/files#diff-bb4c06b85c34f39a0c6bb771c4566759R90-R92) that it is not required, in which case we should update the KIP to reflect that this property is only added in Connect distributed mode and we should correct the logic above to only add this property if it is non-null similar to the [changes in `ConnectMetrics`](https://github.com/apache/kafka/pull/8691/files#diff-bb4c06b85c34f39a0c6bb771c4566759R90-R92).
   
   If it is required, then we'll have to figure out something else.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
##########
@@ -63,6 +63,7 @@
 
     private static final String STATUS_TOPIC = "status-topic";
     private static final String WORKER_ID = "localhost:8083";
+    private static final String CLUSTER_ID = "cluster-1";

Review comment:
       This is unused.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xvrl commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429439813



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
##########
@@ -71,8 +72,13 @@ public JmxReporter() {
 
     /**
      * Create a JMX reporter that prefixes all metrics with the given string.
+     *  @deprecated Since 2.6.0. Use {@link JmxReporter#JmxReporter()}
+     *  Initialize JmxReporter with {@link JmxReporter#contextChange(MetricsContext)}
+     *  Populate prefix by adding _namespace/prefix key value pair to {@link MetricsContext}
      */
+    @Deprecated

Review comment:
       yes, I agree. We'll have to decide whether to make the config include this reporter by default or so something else. There are some backwards compatibility implications, but probably better to have a separate discussion for this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634899043


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428203784



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -92,6 +93,7 @@
     private final ExecutorService executor;
     private final Time time;
     private final String workerId;
+    private final String clusterId;

Review comment:
       Removed clusterId from Worker constructor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429496027



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
##########
@@ -65,4 +66,13 @@ default void validateReconfiguration(Map<String, ?> configs) throws ConfigExcept
     default void reconfigure(Map<String, ?> configs) {
     }
 
+    /**
+     * Callback method providing context metadata for the

Review comment:
       Fixed java doc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429683370



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
##########
@@ -153,6 +164,7 @@ public void setup() {
         workerProps.put("config.providers.file.class", MockFileConfigProvider.class.getName());
         mockFileProviderTestId = UUID.randomUUID().toString();
         workerProps.put("config.providers.file.param.testId", mockFileProviderTestId);
+        workerProps.put("group.id", GROUP_ID);

Review comment:
       Removed group.id and related check. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634880127


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428201738



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
##########
@@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig distributedConfig = new DistributedConfig(workerProps);
         String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(distributedConfig);

Review comment:
       Removed clusterId from KafkaOffsetBackingStore constructor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634930412


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch removed a comment on pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch removed a comment on pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#issuecomment-634860494






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rnpridgeon commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rnpridgeon commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429216813



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
##########
@@ -16,19 +16,24 @@
  */
 package org.apache.kafka.connect.util;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.InvalidRecordException;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 public final class ConnectUtils {
     private static final Logger log = LoggerFactory.getLogger(ConnectUtils.class);
+    public static final String CONNECT_KAFKA_CLUSTER_ID = "connect.kafka.cluster.id";
+    public static final String CONNECT_GROUP_ID = "connect.group.id";

Review comment:
       CONNECT_VERSION?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429495778



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A implementation of MetricsContext, it encapsulates required metrics context properties for Kafka services and clients
+ */
+public class KafkaMetricsContext implements MetricsContext {
+    /**
+     * Client or Service's metadata map.
+     */
+    private Map<String, String> metadata = new HashMap<>();

Review comment:
       > Thanks for the patch @xiaodongdu!
   > 
   > I reviewed the client and broker sections of code as well as the new interface. Comments inline.
   > 
   > I think the ability to include "metrics.context.*" values as config is generally useful. Is it worth doing this for the broker as well?
   > 
   > Also, in case you didn't know, you can use [Markdown](https://guides.github.com/features/mastering-markdown/) in the PR description to format it and make it a little more readable.
   
   Moved String constants into KafkaConfig. Also change this to be final.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r430667093



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
##########
@@ -63,7 +67,7 @@
      * @param config   the worker configuration; may not be null
      * @param time     the time; may not be null
      */
-    public ConnectMetrics(String workerId, WorkerConfig config, Time time) {
+    public ConnectMetrics(String workerId, WorkerConfig config, Time time, String clusterId) {

Review comment:
       Need to add the parameter to the JavaDoc:
   ```
   @param clusterId  the Kafka cluster ID
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ConnectUtils.class})
+@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
+public class WorkerGroupMemberTest {
+    @Mock
+    private ConfigBackingStore configBackingStore;
+    @Mock
+    private StatusBackingStore statusBackingStore;
+
+    @Test
+    public void testMetrics() throws Exception {
+        WorkerGroupMember member;
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");
+        workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        workerProps.put("group.id", "group-1");
+        workerProps.put("offset.storage.topic", "topic-1");
+        workerProps.put("config.storage.topic", "topic-1");
+        workerProps.put("status.storage.topic", "topic-1");
+        workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockConnectMetrics.MockMetricsReporter.class.getName());
+        DistributedConfig config = new DistributedConfig(workerProps);
+
+
+        LogContext logContext = new LogContext("[Worker clientId=client-1 + groupId= group-1]");
+
+        expectClusterId();
+
+        member = new WorkerGroupMember(config, "", configBackingStore,
+        null, Time.SYSTEM, "client-1", logContext);
+
+        for (MetricsReporter reporter : member.metrics().reporters()) {
+            if (reporter instanceof MockConnectMetrics.MockMetricsReporter) {
+                MockConnectMetrics.MockMetricsReporter mockMetricsReporter = (MockConnectMetrics.MockMetricsReporter) reporter;
+                assertEquals("cluster-1", mockMetricsReporter.getMetricsContext().metadata().get(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID));
+                assertEquals("group-1", mockMetricsReporter.getMetricsContext().metadata().get(WorkerConfig.CONNECT_GROUP_ID));
+            }
+        }

Review comment:
       It might be good to verify that we entered the `if (reporter instance Mock...)` block at least once. Otherwise, we might have a bug elsewhere that failed to instantiate the `MockMetricsReporter` class, and this portion of the test would still pass.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -92,6 +93,7 @@
     private final ExecutorService executor;
     private final Time time;
     private final String workerId;
+    private final String clusterId;

Review comment:
       @xiaodongdu, I was not suggesting getting rid of the field. It's fine to have a new field, but we should call the field `kafkaClusterId` rather than `clusterId` since the latter could be misinterpreted to mean the _Connect_ cluster ID.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
##########
@@ -66,20 +67,22 @@ public void configure(final WorkerConfig config) {
         if (topic == null || topic.trim().length() == 0)
             throw new ConfigException("Offset storage topic must be specified");
 
+        String clusterId = ConnectUtils.lookupKafkaClusterId(config);
         data = new HashMap<>();
 
         Map<String, Object> originals = config.originals();
         Map<String, Object> producerProps = new HashMap<>(originals);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
+        ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
 
         Map<String, Object> consumerProps = new HashMap<>(originals);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);

Review comment:
       Should we use `ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId)` a few lines below this? (See the similar [changes you made in `KafkaStatusBackingStore`](https://github.com/apache/kafka/pull/8691/files#diff-5740faccd8040e325fc9ba0e395f31f6R170) and [KafkaConfigBackingStore](https://github.com/apache/kafka/pull/8691/files#diff-1c045c8737aea4c820319a6de65af8a4R462).)

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.MockConnectMetrics;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ConnectUtils.class})
+@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
+public class WorkerGroupMemberTest {
+    @Mock
+    private ConfigBackingStore configBackingStore;
+    @Mock
+    private StatusBackingStore statusBackingStore;
+
+    @Test
+    public void testMetrics() throws Exception {
+        WorkerGroupMember member;
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");

Review comment:
       Let's remove these deprecated configs.
   ```suggestion
   ```

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
##########
@@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) throws ConfigExcept
     default void reconfigure(Map<String, ?> configs) {
     }
 
+    /**
+     * Provides context labels for the service or library exposing metrics
+     *
+     * @param metricsContext the metric context
+     */
+    @InterfaceStability.Evolving
+    default void contextChange(MetricsContext metricsContext) {

Review comment:
       I really don't have a preference. Past tense is a little odd, but I could see where `changeContext(...)` or `setContext(...)` are present tense and more conventional.

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
##########
@@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) throws ConfigExcept
     default void reconfigure(Map<String, ?> configs) {
     }
 
+    /**
+     * Provides context labels for the service or library exposing metrics
+     *
+     * @param metricsContext the metric context
+     */
+    @InterfaceStability.Evolving
+    default void contextChange(MetricsContext metricsContext) {

Review comment:
       I really don't have a strong preference. Past tense is a little odd, but I think `changeContext(...)` or `setContext(...)` are present-tense and more conventional.

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsContext.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * MetricsContext encapsulates additional contextLabels about metrics exposed via a
+ * {@link org.apache.kafka.common.metrics.MetricsReporter}
+ *
+ * The contextLabels map provides following information:
+ * - a <code>_namespace</node> field indicating the component exposing metrics
+ *   e.g. kafka.server, kafka.consumer
+ *   {@link JmxReporter} uses this as prefix for mbean names
+ *
+ * - for clients and streams libraries: any freeform fields passed in via
+ *   client properties in the form of `metrics.context.<key>=<value>
+ *
+ * - for kafka brokers: kafka.broker.id, kafka.cluster.id
+ * - for connect workers: connect.kafka.cluster.id, connect.group.id
+ */
+@InterfaceStability.Evolving
+public interface MetricsContext {
+    /* predefined fields */
+    String NAMESPACE = "_namespace"; // metrics namespace, formerly jmx prefix
+
+    /**
+     * Returns contextLabels fields
+     */
+    Map<String, String> contextLabels();

Review comment:
       This JavaDoc is incomplete. Perhaps something like:
   ```suggestion
       /**
        * Returns the labels for this metrics context.
        *
        * @return the map of label keys and values; never null but possibly empty
        */
       Map<String, String> contextLabels();
   ```

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A implementation of MetricsContext, it encapsulates required metrics context properties for Kafka services and clients
+ */
+public class KafkaMetricsContext implements MetricsContext {
+    /**
+     * Client or Service's contextLabels map.
+     */
+    private final Map<String, String> contextLabels = new HashMap<>();
+
+    /**
+     * Create a MetricsContext with namespace, no service or client properties
+     * @param namespace value for _namespace key
+     */
+    public KafkaMetricsContext(String namespace) {
+        this(namespace, new HashMap<>());
+    }
+
+    /**
+     * Create a MetricsContext with namespace, service or client properties
+     * @param namespace value for _namespace key
+     * @param contextLabels  contextLabels additional entries to add to the context.
+     *                  values will be converted to string using Object.toString()
+     */
+    public KafkaMetricsContext(String namespace, Map<String, ?> contextLabels) {
+        this.contextLabels.put(MetricsContext.NAMESPACE, namespace);
+        contextLabels.forEach((key, value) -> this.contextLabels.put(key, value.toString()));
+    }
+
+    public Map<String, String> contextLabels() {

Review comment:
       Need an override annotation here:
   ```suggestion
       @Override
       public Map<String, String> contextLabels() {
   ```

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
##########
@@ -65,4 +66,12 @@ default void validateReconfiguration(Map<String, ?> configs) throws ConfigExcept
     default void reconfigure(Map<String, ?> configs) {
     }
 
+    /**
+     * Provides context labels for the service or library exposing metrics
+     *
+     * @param metricsContext the metric context
+     */
+    @InterfaceStability.Evolving
+    default void contextChange(MetricsContext metricsContext) {

Review comment:
       It would also be good to identify when this is called relative to other methods. For example, it is always called before `init(...)` is called. But can it be called again, or is that the only time this method is called?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org