You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/09 13:04:57 UTC

[GitHub] [kafka] dongjinleekr opened a new pull request #11586: KAFKA-13516: Connection level metrics are not closed

dongjinleekr opened a new pull request #11586:
URL: https://github.com/apache/kafka/pull/11586


   Here is the fix. The core of this approach is using a `ConcurrentHashMap` to manage connection ids and their corresponding sensors.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11586: KAFKA-13516: Connection level metrics are not closed

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11586:
URL: https://github.com/apache/kafka/pull/11586#discussion_r800030837



##########
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##########
@@ -1144,7 +1149,11 @@ public void close() {
         public SelectorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) {
             this.metrics = metrics;
             this.metricTags = metricTags;
-            this.metricsPerConnection = metricsPerConnection;
+            if (metricsPerConnection) {
+                this.connectionMetrics = new ConcurrentHashMap<>();

Review comment:
       Do we really need a `ConcurrentHashMap` here? My understanding is that the selector should be used by a single thread. Isn't it the case here?

##########
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##########
@@ -1305,44 +1314,62 @@ private Sensor sensor(String name, Sensor... parents) {
         }
 
         public void maybeRegisterConnectionMetrics(String connectionId) {
-            if (!connectionId.isEmpty() && metricsPerConnection) {
-                // if one sensor of the metrics has been registered for the connection,
-                // then all other sensors should have been registered; and vice versa
-                String nodeRequestName = "node-" + connectionId + ".requests-sent";
-                Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
-                if (nodeRequest == null) {
-                    Map<String, String> tags = new LinkedHashMap<>(metricTags);
-                    tags.put("node-id", "node-" + connectionId);
-
-                    nodeRequest = sensor(nodeRequestName);
-                    nodeRequest.add(createMeter(metrics, perConnectionMetricGrpName, tags, new WindowedCount(), "request", "requests sent"));
-                    MetricName metricName = metrics.metricName("request-size-avg", perConnectionMetricGrpName, "The average size of requests sent.", tags);
-                    nodeRequest.add(metricName, new Avg());
-                    metricName = metrics.metricName("request-size-max", perConnectionMetricGrpName, "The maximum size of any request sent.", tags);
-                    nodeRequest.add(metricName, new Max());
-
-                    String bytesSentName = "node-" + connectionId + ".bytes-sent";
-                    Sensor bytesSent = sensor(bytesSentName);
-                    bytesSent.add(createMeter(metrics, perConnectionMetricGrpName, tags, "outgoing-byte", "outgoing bytes"));
-
-                    String nodeResponseName = "node-" + connectionId + ".responses-received";
-                    Sensor nodeResponse = sensor(nodeResponseName);
-                    nodeResponse.add(createMeter(metrics, perConnectionMetricGrpName, tags, new WindowedCount(), "response", "responses received"));
-
-                    String bytesReceivedName = "node-" + connectionId + ".bytes-received";
-                    Sensor bytesReceive = sensor(bytesReceivedName);
-                    bytesReceive.add(createMeter(metrics, perConnectionMetricGrpName, tags, "incoming-byte", "incoming bytes"));
-
-                    String nodeTimeName = "node-" + connectionId + ".latency";
-                    Sensor nodeRequestTime = sensor(nodeTimeName);
-                    metricName = metrics.metricName("request-latency-avg", perConnectionMetricGrpName, tags);
-                    nodeRequestTime.add(metricName, new Avg());
-                    metricName = metrics.metricName("request-latency-max", perConnectionMetricGrpName, tags);
-                    nodeRequestTime.add(metricName, new Max());
-                }
+            if (!connectionId.isEmpty() && connectionMetrics != null) {
+                connectionMetrics.computeIfAbsent(connectionId, (key) -> {
+                    // key: connection id
+                    // value: set of sensors (currently null)
+                    return perConnectionSensors(key);
+                });
             }
         }
 
+        public void maybeUnregisterConnectionMetrics(String connectionId) {
+            if (!connectionId.isEmpty() && connectionMetrics != null) {
+                connectionMetrics.computeIfPresent(connectionId, (key, value) -> {
+                    // key: connection id
+                    // value: set of sensors
+                    for (Sensor sensor : value) {
+                        metrics.removeSensor(sensor.name());
+                    }
+                    return null;
+                });
+            }
+        }
+
+        private Set<Sensor> perConnectionSensors(String connectionId) {

Review comment:
       Would it make sense to create a `ConnectionMetrics` class to hold all the connection metrics? That would give us an opportunity to improve all the `record*` methods as well. They could get the sensors based on the `connectionId`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on a change in pull request #11586: KAFKA-13516: Connection level metrics are not closed

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on a change in pull request #11586:
URL: https://github.com/apache/kafka/pull/11586#discussion_r805725430



##########
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##########
@@ -1305,44 +1314,62 @@ private Sensor sensor(String name, Sensor... parents) {
         }
 
         public void maybeRegisterConnectionMetrics(String connectionId) {
-            if (!connectionId.isEmpty() && metricsPerConnection) {
-                // if one sensor of the metrics has been registered for the connection,
-                // then all other sensors should have been registered; and vice versa
-                String nodeRequestName = "node-" + connectionId + ".requests-sent";
-                Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
-                if (nodeRequest == null) {
-                    Map<String, String> tags = new LinkedHashMap<>(metricTags);
-                    tags.put("node-id", "node-" + connectionId);
-
-                    nodeRequest = sensor(nodeRequestName);
-                    nodeRequest.add(createMeter(metrics, perConnectionMetricGrpName, tags, new WindowedCount(), "request", "requests sent"));
-                    MetricName metricName = metrics.metricName("request-size-avg", perConnectionMetricGrpName, "The average size of requests sent.", tags);
-                    nodeRequest.add(metricName, new Avg());
-                    metricName = metrics.metricName("request-size-max", perConnectionMetricGrpName, "The maximum size of any request sent.", tags);
-                    nodeRequest.add(metricName, new Max());
-
-                    String bytesSentName = "node-" + connectionId + ".bytes-sent";
-                    Sensor bytesSent = sensor(bytesSentName);
-                    bytesSent.add(createMeter(metrics, perConnectionMetricGrpName, tags, "outgoing-byte", "outgoing bytes"));
-
-                    String nodeResponseName = "node-" + connectionId + ".responses-received";
-                    Sensor nodeResponse = sensor(nodeResponseName);
-                    nodeResponse.add(createMeter(metrics, perConnectionMetricGrpName, tags, new WindowedCount(), "response", "responses received"));
-
-                    String bytesReceivedName = "node-" + connectionId + ".bytes-received";
-                    Sensor bytesReceive = sensor(bytesReceivedName);
-                    bytesReceive.add(createMeter(metrics, perConnectionMetricGrpName, tags, "incoming-byte", "incoming bytes"));
-
-                    String nodeTimeName = "node-" + connectionId + ".latency";
-                    Sensor nodeRequestTime = sensor(nodeTimeName);
-                    metricName = metrics.metricName("request-latency-avg", perConnectionMetricGrpName, tags);
-                    nodeRequestTime.add(metricName, new Avg());
-                    metricName = metrics.metricName("request-latency-max", perConnectionMetricGrpName, tags);
-                    nodeRequestTime.add(metricName, new Max());
-                }
+            if (!connectionId.isEmpty() && connectionMetrics != null) {
+                connectionMetrics.computeIfAbsent(connectionId, (key) -> {
+                    // key: connection id
+                    // value: set of sensors (currently null)
+                    return perConnectionSensors(key);
+                });
             }
         }
 
+        public void maybeUnregisterConnectionMetrics(String connectionId) {
+            if (!connectionId.isEmpty() && connectionMetrics != null) {
+                connectionMetrics.computeIfPresent(connectionId, (key, value) -> {
+                    // key: connection id
+                    // value: set of sensors
+                    for (Sensor sensor : value) {
+                        metrics.removeSensor(sensor.name());
+                    }
+                    return null;
+                });
+            }
+        }
+
+        private Set<Sensor> perConnectionSensors(String connectionId) {

Review comment:
       So... `ConnectionMetrics` should hold all the metrics related to given `connectionId`, similar to `GroupCoordinatorMetrics` or `SenderMetrics`. Do I understand correctly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11586: KAFKA-13516: Connection level metrics are not closed

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11586:
URL: https://github.com/apache/kafka/pull/11586#issuecomment-1030581570


   @dongjinleekr Could we also add a unit test which verifies that connection metrics are closed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on a change in pull request #11586: KAFKA-13516: Connection level metrics are not closed

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on a change in pull request #11586:
URL: https://github.com/apache/kafka/pull/11586#discussion_r805725430



##########
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##########
@@ -1305,44 +1314,62 @@ private Sensor sensor(String name, Sensor... parents) {
         }
 
         public void maybeRegisterConnectionMetrics(String connectionId) {
-            if (!connectionId.isEmpty() && metricsPerConnection) {
-                // if one sensor of the metrics has been registered for the connection,
-                // then all other sensors should have been registered; and vice versa
-                String nodeRequestName = "node-" + connectionId + ".requests-sent";
-                Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
-                if (nodeRequest == null) {
-                    Map<String, String> tags = new LinkedHashMap<>(metricTags);
-                    tags.put("node-id", "node-" + connectionId);
-
-                    nodeRequest = sensor(nodeRequestName);
-                    nodeRequest.add(createMeter(metrics, perConnectionMetricGrpName, tags, new WindowedCount(), "request", "requests sent"));
-                    MetricName metricName = metrics.metricName("request-size-avg", perConnectionMetricGrpName, "The average size of requests sent.", tags);
-                    nodeRequest.add(metricName, new Avg());
-                    metricName = metrics.metricName("request-size-max", perConnectionMetricGrpName, "The maximum size of any request sent.", tags);
-                    nodeRequest.add(metricName, new Max());
-
-                    String bytesSentName = "node-" + connectionId + ".bytes-sent";
-                    Sensor bytesSent = sensor(bytesSentName);
-                    bytesSent.add(createMeter(metrics, perConnectionMetricGrpName, tags, "outgoing-byte", "outgoing bytes"));
-
-                    String nodeResponseName = "node-" + connectionId + ".responses-received";
-                    Sensor nodeResponse = sensor(nodeResponseName);
-                    nodeResponse.add(createMeter(metrics, perConnectionMetricGrpName, tags, new WindowedCount(), "response", "responses received"));
-
-                    String bytesReceivedName = "node-" + connectionId + ".bytes-received";
-                    Sensor bytesReceive = sensor(bytesReceivedName);
-                    bytesReceive.add(createMeter(metrics, perConnectionMetricGrpName, tags, "incoming-byte", "incoming bytes"));
-
-                    String nodeTimeName = "node-" + connectionId + ".latency";
-                    Sensor nodeRequestTime = sensor(nodeTimeName);
-                    metricName = metrics.metricName("request-latency-avg", perConnectionMetricGrpName, tags);
-                    nodeRequestTime.add(metricName, new Avg());
-                    metricName = metrics.metricName("request-latency-max", perConnectionMetricGrpName, tags);
-                    nodeRequestTime.add(metricName, new Max());
-                }
+            if (!connectionId.isEmpty() && connectionMetrics != null) {
+                connectionMetrics.computeIfAbsent(connectionId, (key) -> {
+                    // key: connection id
+                    // value: set of sensors (currently null)
+                    return perConnectionSensors(key);
+                });
             }
         }
 
+        public void maybeUnregisterConnectionMetrics(String connectionId) {
+            if (!connectionId.isEmpty() && connectionMetrics != null) {
+                connectionMetrics.computeIfPresent(connectionId, (key, value) -> {
+                    // key: connection id
+                    // value: set of sensors
+                    for (Sensor sensor : value) {
+                        metrics.removeSensor(sensor.name());
+                    }
+                    return null;
+                });
+            }
+        }
+
+        private Set<Sensor> perConnectionSensors(String connectionId) {

Review comment:
       So... `ConnectionMetrics` should hold all the metrics related to given `connectionId`, in the form of `Set<Sensor>`. Do I understand correctly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] splett2 commented on a change in pull request #11586: KAFKA-13516: Connection level metrics are not closed

Posted by GitBox <gi...@apache.org>.
splett2 commented on a change in pull request #11586:
URL: https://github.com/apache/kafka/pull/11586#discussion_r800073769



##########
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##########
@@ -945,6 +947,8 @@ private void close(KafkaChannel channel, CloseMode closeMode) {
 
         if (idleExpiryManager != null)
             idleExpiryManager.remove(channel.id());
+
+        sensors.maybeUnregisterConnectionMetrics(channel.id());

Review comment:
       I don't agree that we should be removing these metrics when closing a connection.
   
   The per-connection metrics are only created for client-side users of the Selector, so the number of per-connection metrics is reasonably small.
   
   Some of these metrics are meant to be monotonic like the `count` metrics that are instantiated for the Meters. this means we'd be clearing the value any time a client disconnects from a broker.
   
   I think we should not close these metrics on connection close, and only close them when the Selector itself is closed. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11586: KAFKA-13516: Connection level metrics are not closed

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11586:
URL: https://github.com/apache/kafka/pull/11586#discussion_r800075216



##########
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##########
@@ -945,6 +947,8 @@ private void close(KafkaChannel channel, CloseMode closeMode) {
 
         if (idleExpiryManager != null)
             idleExpiryManager.remove(channel.id());
+
+        sensors.maybeUnregisterConnectionMetrics(channel.id());

Review comment:
       It seems that you are right, David. Closing when the selector is closed makes more sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] splett2 commented on a change in pull request #11586: KAFKA-13516: Connection level metrics are not closed

Posted by GitBox <gi...@apache.org>.
splett2 commented on a change in pull request #11586:
URL: https://github.com/apache/kafka/pull/11586#discussion_r800194330



##########
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##########
@@ -1144,7 +1149,11 @@ public void close() {
         public SelectorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) {
             this.metrics = metrics;
             this.metricTags = metricTags;
-            this.metricsPerConnection = metricsPerConnection;
+            if (metricsPerConnection) {
+                this.connectionMetrics = new ConcurrentHashMap<>();

Review comment:
       Agreed. The Selector is advertised not to be thread safe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] splett2 commented on a change in pull request #11586: KAFKA-13516: Connection level metrics are not closed

Posted by GitBox <gi...@apache.org>.
splett2 commented on a change in pull request #11586:
URL: https://github.com/apache/kafka/pull/11586#discussion_r800073769



##########
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##########
@@ -945,6 +947,8 @@ private void close(KafkaChannel channel, CloseMode closeMode) {
 
         if (idleExpiryManager != null)
             idleExpiryManager.remove(channel.id());
+
+        sensors.maybeUnregisterConnectionMetrics(channel.id());

Review comment:
       I don't agree that we should be removing these metrics when closing a connection.
   
   The per-connection metrics are only created for client-side users of the Selector.
   
   Some of these metrics are meant to be monotonic like the `count` metrics that are instantiated for the Meters. this means we'd be clearing the value any time a client disconnects from a broker.
   
   Given that the number of connection meters instantiated per selector is generally small (scales with # of brokers the selector connects to), I think we should not close these metrics on connection close, and only close when the Selector itself is closed. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on a change in pull request #11586: KAFKA-13516: Connection level metrics are not closed

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on a change in pull request #11586:
URL: https://github.com/apache/kafka/pull/11586#discussion_r805720823



##########
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##########
@@ -1144,7 +1149,11 @@ public void close() {
         public SelectorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) {
             this.metrics = metrics;
             this.metricTags = metricTags;
-            this.metricsPerConnection = metricsPerConnection;
+            if (metricsPerConnection) {
+                this.connectionMetrics = new ConcurrentHashMap<>();

Review comment:
       Oh yes, its javadoc states that `This class is not thread safe!`;




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #11586: KAFKA-13516: Connection level metrics are not closed

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #11586:
URL: https://github.com/apache/kafka/pull/11586#issuecomment-1030531283


   Cc @apovzner @splett2 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org