You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/19 19:32:09 UTC

[GitHub] [kafka] rhauch commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

rhauch commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r426971298



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -270,9 +272,15 @@ Duration adminTimeout() {
     List<MetricsReporter> metricsReporters() {
         List<MetricsReporter> reporters = getConfiguredInstances(
                 CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
-        JmxReporter jmxReporter = new JmxReporter("kafka.connect.mirror");
+        JmxReporter jmxReporter = new JmxReporter();
         jmxReporter.configure(this.originals());
         reporters.add(jmxReporter);
+        MetricsContext metricsContext = new KafkaMetricsContext("kafka.connect.mirror");

Review comment:
       I don't think the KIP mentioned this `kafka.connect.mirror` metrics context. It's probably worthwhile to update the KIP and then notify the vote thread of the minor change noticed during implementation.

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
##########
@@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig distributedConfig = new DistributedConfig(workerProps);
         String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(distributedConfig);

Review comment:
       Do we need to modify the `KafkaOffsetBackingStore()` constructor? The `ConnectUtils.lookupKafkaClusterId(...)` can be called with the `WorkerConfig` (which is the parent class of `DistributedConfig`) passed to it via the `configure(...)` method, so couldn't the `configure(...)` method call the lookup method?
   
   This may seem minor, but it follows the existing pattern for this class.

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
##########
@@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig distributedConfig = new DistributedConfig(workerProps);
         String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(distributedConfig);
-        Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
+        Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY, kafkaClusterId);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);
         statusBackingStore.configure(distributedConfig);

Review comment:
       Could the `KafkaStatusBackingStore(...)` get the cluster ID from the `distributedConfig` passed into the `configure(...)` method, similar to the `KafkaOffsetBackingStore`?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##########
@@ -101,24 +101,25 @@ public Connect startConnect(Map<String, String> workerProps) {
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(config);
 
         ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
                 config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
                 config, ConnectorClientConfigOverridePolicy.class);
 
-        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
+        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy, kafkaClusterId);

Review comment:
       Same comment here as in `MirrorMaker`: the worker's constructor could get the Kafka cluster ID directly from the worker config.

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
##########
@@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig distributedConfig = new DistributedConfig(workerProps);
         String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(distributedConfig);
-        Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
+        Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY, kafkaClusterId);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);
         statusBackingStore.configure(distributedConfig);
         ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
                 internalValueConverter,
                 distributedConfig,
-                configTransformer);
+                configTransformer,
+                kafkaClusterId);

Review comment:
       Could the `KafkaConfigBackingStore(...)` get the cluster ID from the `distributedConfig`? One of the reasons why we pass the whole worker config to the constructor is so that we don't have to always modify the constructor to pass in additional information that can be derived from the worker config.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##########
@@ -101,24 +101,25 @@ public Connect startConnect(Map<String, String> workerProps) {
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(config);
 
         ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
                 config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
                 config, ConnectorClientConfigOverridePolicy.class);
 
-        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
+        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy, kafkaClusterId);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
 
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);

Review comment:
       Same comment here as in `MirrorMaker`: the backing store's constructor could get the Kafka cluster ID directly from the worker config.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -92,6 +93,7 @@
     private final ExecutorService executor;
     private final Time time;
     private final String workerId;
+    private final String clusterId;

Review comment:
       The Kafka cluster ID is passed into the constructor, but is this supposed to represent the Connect cluster ID or the Kafka cluster ID? Since this is in Connect code, without a context we'd assume it was the Connect cluster ID.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##########
@@ -101,24 +101,25 @@ public Connect startConnect(Map<String, String> workerProps) {
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);
         offsetBackingStore.configure(config);
 
         ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
                 config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
                 config, ConnectorClientConfigOverridePolicy.class);
 
-        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
+        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy, kafkaClusterId);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
 
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);
         statusBackingStore.configure(config);
 
         ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
                 internalValueConverter,
                 config,
-                configTransformer);
+                configTransformer,
+                kafkaClusterId);

Review comment:
       Same comment here as in `MirrorMaker`: the backing store's constructor could get the Kafka cluster ID directly from the worker config.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
##########
@@ -93,7 +93,7 @@ public static void main(String[] args) {
                 config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
                 config, ConnectorClientConfigOverridePolicy.class);
             Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore(),
-                                       connectorClientConfigOverridePolicy);
+                                       connectorClientConfigOverridePolicy, kafkaClusterId);

Review comment:
       Same comment here as in `MirrorMaker`: the worker's constructor could get the Kafka cluster ID directly from the worker config.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##########
@@ -101,24 +101,25 @@ public Connect startConnect(Map<String, String> workerProps) {
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(kafkaClusterId);

Review comment:
       Same comment here as in `MirrorMaker`: the backing store's `configure(...)` method could get the Kafka cluster ID directly from the worker config.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org