You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "chia7712 (via GitHub)" <gi...@apache.org> on 2023/02/21 19:41:35 UTC

[GitHub] [kafka] chia7712 opened a new pull request, #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

chia7712 opened a new pull request, #13285:
URL: https://github.com/apache/kafka/pull/13285

   The collections used by generating metrics are thread-safe already, so the synchronization is unnecessary
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13285:
URL: https://github.com/apache/kafka/pull/13285#issuecomment-1591249098

   @chia7712 since you last worked on this, there has been a new development. We have introduced newer libraries [1] in Kafka which are much more efficient than copyOnWrite data structures we are using here. Could you please consider reusing those data structures instead?
   
   [1] https://github.com/apache/kafka/pull/13437


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] github-actions[bot] commented on pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13285:
URL: https://github.com/apache/kafka/pull/13285#issuecomment-1716888660

   This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has  merge conflicts, please update it with the latest from trunk (or appropriate release branch) <p> If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-13874 Avoid synchronization in SocketServer metrics [kafka]

Posted by "chia7712 (via GitHub)" <gi...@apache.org>.
chia7712 closed pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics
URL: https://github.com/apache/kafka/pull/13285


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13285:
URL: https://github.com/apache/kafka/pull/13285#issuecomment-1448210313

   Hey @chia7712 
   
   > the mutable ArrayBuffer won't be modified after it is created
   
   I have a different view of this. Please correct me if I am wrong. 
   The number of network threads is dynamically reconfigurable in a broker. `processors` in `SocketServer.scala` has one processor per thread. Hence, it is possible to add/remove entries from `ArrayBuffer[Processor]` while the server is already running. An example of this situation is in `SocketServer.scala#reconfigure()` (line 503) function where we call `addProcessors()` (or `removeProcessors()`) and eventually `processors ++= listenerProcessors`. 
   
   Consider the following scenario which will go wrong with the changes in this PR:
   
   1. Thread 1 (metric thread) tries to read from `processors` at line 119 of `SocketServer.scala`
   2. At the same time, Thread 2 (network thread) invokes `override def reconfigure(configs: util.Map[String, _]): Unit` which calls `addProcessors()` which tried to acquire a lock on `this` and successfully obtains it (because we have removed the acquisition of lock from the metrics and hence Thread 1 is running without acquiring a lock). After obtaining the lock it adds a value to `ArrayBuffer[Processor]`. 
   3. We have both Thread 1 and Thread 2 attempting to read/write from `mutable ArrayBuffer`. This is not a thread safe collection and hence the result could lead to inconsistent state for ArrayBuffer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] chia7712 commented on pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

Posted by "chia7712 (via GitHub)" <gi...@apache.org>.
chia7712 commented on PR #13285:
URL: https://github.com/apache/kafka/pull/13285#issuecomment-1448290193

   > Consider the following scenario which will go wrong with the changes in this PR:
   
   @divijvaidya thanks for nice explanation. You are right. I will adopt the solution#1 -  reduce the granularity of the lock. PTAL and thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13285:
URL: https://github.com/apache/kafka/pull/13285#issuecomment-1450075461

   I am afraid the latest approach will also not work. This is because there is a possibility that `ArrayBuffer` is internally performing size expansion while we read at `dataPlaneProcessors.size` or when iterating on it using the `map`. This concurrent access could lead to undefined results (notably, this would not have been a problem if we were using a fixed size array). Also, note that the current implementation works for `controlPlaneAcceptorOpt` since they do not access the `processors` arrayBuffer outside the lock.
   
   When I mentioned, option 1 of using fine grained locking, I actually implied locking on processors object instead of locking on entire SocketServer object. If we go down this path, we will have to change other places in the file to acquire this processor lock when mutation and we have also have to ensure that deadlock doesn't occur when trying to acquire SocketServer lock and Processors lock.
   
   Hence, my suggestion would be to opt for a lock-free concurrent access data structure for storing processors.
   
   Here's our requirement for such a data structure:
   - we don't mutate the data structure frequently, so even if writes are slow, we are ok with that.
   - we require lock-free concurrent reads since we perform a read with every connection setup and every time we emit a metric
   - the size of the data structure is going to be small, in tens to low hundreds entries
   - the data structure should be able to expand it's size since we allow dynamic shrinking and expanding
   
   Based on the above, we can choose to use a ConcurrentHashMap or a CopyOnWriteArrayList for storing processors.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] github-actions[bot] commented on pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13285:
URL: https://github.com/apache/kafka/pull/13285#issuecomment-1585813159

   This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has  merge conflicts, please update it with the latest from trunk (or appropriate release branch)
   If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13285:
URL: https://github.com/apache/kafka/pull/13285#discussion_r1116962787


##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -115,8 +115,8 @@ class SocketServer(val config: KafkaConfig,
   private var stopped = false
 
   // Socket server metrics
-  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized {
-    val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors)
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => {
+    val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors).toSeq

Review Comment:
   Maybe this comment would be useful in the code itself?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] chia7712 commented on a diff in pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

Posted by "chia7712 (via GitHub)" <gi...@apache.org>.
chia7712 commented on code in PR #13285:
URL: https://github.com/apache/kafka/pull/13285#discussion_r1117116933


##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -115,8 +115,8 @@ class SocketServer(val config: KafkaConfig,
   private var stopped = false
 
   // Socket server metrics
-  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized {
-    val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors)
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => {
+    val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors).toSeq

Review Comment:
   > Maybe this comment would be useful in the code itself?
   
   you are right :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13285:
URL: https://github.com/apache/kafka/pull/13285#issuecomment-1443643050

   Hello @chia7712 
   I have a question about this change. Please correct me if my understanding is wrong.
   
   In `SocketServer.scala`, `dataPlaneAcceptors` is a ConcurrentHashMap and hence, the keys could be be read/written-to by multiple threads. For `size()` computation,  `ConcurrentHashMap` gives an approximate value which should suffice for metrics use case. However, the `processors` are stored in an `ArrayBuffer` which is mutable. 
   
   The purpose of this lock on the entire SocketServer object is to ensure that `processors` are accessed in a thread safe manner. That is why we have a similar lock on entire SocketServer object in `acceptNewConnections()`, `removeProcessors()` etc.
   
   If we remove this lock, the `processors` will be accessed in a thread unsafe manner. Isn't that right?
   
   Perhaps a better way to solve this to 1\ reduce the granularity of the lock since we don't want a lock on entire SocketServer object but just on the processors and 2\ use shared locks and exclusive lock which will allow concurrent reads to happen and the same time entire that writes happen in isolation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] chia7712 commented on a diff in pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

Posted by "chia7712 (via GitHub)" <gi...@apache.org>.
chia7712 commented on code in PR #13285:
URL: https://github.com/apache/kafka/pull/13285#discussion_r1113497710


##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -115,8 +115,8 @@ class SocketServer(val config: KafkaConfig,
   private var stopped = false
 
   // Socket server metrics
-  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized {
-    val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors)
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => {
+    val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors).toSeq

Review Comment:
   convert it to seq in order to make sure the "size" and "isEmpty" are consistency



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] chia7712 commented on a diff in pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

Posted by "chia7712 (via GitHub)" <gi...@apache.org>.
chia7712 commented on code in PR #13285:
URL: https://github.com/apache/kafka/pull/13285#discussion_r1123098018


##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -141,8 +142,8 @@ class SocketServer(val config: KafkaConfig,
   }
   newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
   newGauge("MemoryPoolUsed", () => memoryPool.size() - memoryPool.availableMemory)
-  newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized {
-    val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors)
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => {
+    val dataPlaneProcessors = dataPlaneAcceptors.values.asScala.flatMap(a => a.processors.asScala)

Review Comment:
   > We cannot convert processors to Scala since it transforms it into a mutable ArrayBuffer.
   
   pardon me. why we can't use "mutable ArrayBuffer" here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] chia7712 commented on a diff in pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

Posted by "chia7712 (via GitHub)" <gi...@apache.org>.
chia7712 commented on code in PR #13285:
URL: https://github.com/apache/kafka/pull/13285#discussion_r1123116645


##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -115,22 +115,23 @@ class SocketServer(val config: KafkaConfig,
   private var stopped = false
 
   // Socket server metrics
-  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized {
-    val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors)
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => {
+    val dataPlaneProcessors = dataPlaneAcceptors.values.asScala.flatMap(a => a.processors.asScala)
+    // copy to an immutable array to avoid concurrency issue when calculating average
     val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
       metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
-    }
-    if (dataPlaneProcessors.isEmpty) {
+    }.toArray

Review Comment:
   > To mitigate it, we can store the size in an int before this and then, we don't have to convert ioWaitRatioMetricNames to an array.
   
   the `ioWaitRatioMetricNames` might reflect modifications of `processors`, so the actual size of processors may get changed when we calculate the average. For example, we cache the size = 5 but the `processors` could be increased to 6 when we summarize all `processors`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] chia7712 commented on pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

Posted by "chia7712 (via GitHub)" <gi...@apache.org>.
chia7712 commented on PR #13285:
URL: https://github.com/apache/kafka/pull/13285#issuecomment-1443786837

   @divijvaidya thanks for feedback!  
   
   >  However, the processors are stored in an ArrayBuffer which is mutable.
   
   pardon me. the mutable ArrayBuffer won't be modified after it is created. It seems to me the "mutable" won't hurt us here.
   
   > If we remove this lock, the processors will be accessed in a thread unsafe manner. Isn't that right?
   
   All we need to update metrics is the `metricTags` of processor. we won't update `metricTags` after processor is created. It seems to me it is thread-safe after we get collection copy from `ConcurrentHashMap` as the collection copy and `metricTags` won't get changed anymore. Also, we have `Option` to handle null metrics if the processor is removed by anther thread.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] chia7712 commented on pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

Posted by "chia7712 (via GitHub)" <gi...@apache.org>.
chia7712 commented on PR #13285:
URL: https://github.com/apache/kafka/pull/13285#issuecomment-1450145297

   @divijvaidya thanks for great feedback!
   
   >  we have also have to ensure that deadlock doesn't occur when trying to acquire SocketServer lock and Processors lock.
   
   that is interesting. the mutation of processors is locked by `Acceptor` object. It means getting only lock of SocketServer is not safe when we plan to access the `processors`. We should require `Acceptor` lock also. In other words, this issue is related not only performance but also potential bug (concurrent issue).
   
   > we can choose to use a ConcurrentHashMap or a CopyOnWriteArrayList for storing processors.
   
   I will use `CopyOnWriteArrayList`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13285:
URL: https://github.com/apache/kafka/pull/13285#discussion_r1122955794


##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -606,7 +607,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
     newPort
   }
 
-  private[network] val processors = new ArrayBuffer[Processor]()
+  private[network] val processors = new CopyOnWriteArrayList[Processor]()

Review Comment:
   It would be nice if you could add a comment here on why we chose this data structure. Folks who look at this code in future will have a clear explanation of choices and tradeoffs we made for this.



##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -141,8 +142,8 @@ class SocketServer(val config: KafkaConfig,
   }
   newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
   newGauge("MemoryPoolUsed", () => memoryPool.size() - memoryPool.availableMemory)
-  newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized {
-    val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors)
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => {
+    val dataPlaneProcessors = dataPlaneAcceptors.values.asScala.flatMap(a => a.processors.asScala)

Review Comment:
   We cannot convert processors to Scala since it transforms it into a mutable ArrayBuffer.
   
   We probably don't need Scala transformations here. Could you please try something like this:
   `dataPlaneAcceptors.values.stream.flatMap(a => a.processors.stream)`
   
   (same comment for other places such as line 119)



##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -115,22 +115,23 @@ class SocketServer(val config: KafkaConfig,
   private var stopped = false
 
   // Socket server metrics
-  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized {
-    val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors)
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => {
+    val dataPlaneProcessors = dataPlaneAcceptors.values.asScala.flatMap(a => a.processors.asScala)
+    // copy to an immutable array to avoid concurrency issue when calculating average
     val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
       metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
-    }
-    if (dataPlaneProcessors.isEmpty) {
+    }.toArray

Review Comment:
   This leads to object creation on every call to this metric (which is going to happen frequently). Do we really want this?
   
   If I understand correctly, your motivation is to guard against scenarios where the number of processors between the time when we calculate `ioWaitRatioMetricNames` and when we calculate `dataPlaneProcessors.size`. To mitigate it, we can store the size in an int before this and then, we don't have to convert `ioWaitRatioMetricNames` to an array.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13285: KAFKA-13874 Avoid synchronization in SocketServer metrics

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13285:
URL: https://github.com/apache/kafka/pull/13285#discussion_r1123127295


##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -115,22 +115,15 @@ class SocketServer(val config: KafkaConfig,
   private var stopped = false
 
   // Socket server metrics
-  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized {
-    val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors)
-    val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
-      metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
-    }
-    if (dataPlaneProcessors.isEmpty) {
-      1.0
-    } else {
-      ioWaitRatioMetricNames.map { metricName =>
-        Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))
-      }.sum / dataPlaneProcessors.size
-    }
-  })
+  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () =>
+    dataPlaneAcceptors.values.stream().flatMap(a => a.processors.stream())

Review Comment:
   nit
   
   `()` can be removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org