You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2020/06/03 14:26:55 UTC

[storm] branch master updated: STORM-3641 upgrade metrics API for JCQueue

This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new e3852a2  STORM-3641 upgrade metrics API for JCQueue
     new df2e8d6  Merge pull request #3277 from agresch/agresch_storm_3641
e3852a2 is described below

commit e3852a24eb994e3acc63551c99766e2fdd91bcdf
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Mon Jun 1 11:37:37 2020 -0500

    STORM-3641 upgrade metrics API for JCQueue
---
 .../storm/perf/queuetest/JCQueuePerfTest.java      |  28 ++--
 .../storm/daemon/metrics/BuiltinMetricsUtil.java   |   8 --
 .../apache/storm/daemon/worker/WorkerState.java    |  11 +-
 .../apache/storm/daemon/worker/WorkerTransfer.java |   3 +-
 .../apache/storm/executor/bolt/BoltExecutor.java   |   6 -
 .../apache/storm/executor/spout/SpoutExecutor.java |   2 -
 .../jvm/org/apache/storm/metrics2/JcMetrics.java   |  39 ------
 .../apache/storm/metrics2/StormMetricRegistry.java |   9 +-
 .../src/jvm/org/apache/storm/utils/JCQueue.java    | 150 +++++----------------
 .../jvm/org/apache/storm/utils/JCQueueMetrics.java | 125 +++++++++++++++++
 .../storm/utils/JCQueueBackpressureTest.java       |   4 +-
 .../jvm/org/apache/storm/utils/JCQueueTest.java    |   4 +-
 12 files changed, 189 insertions(+), 200 deletions(-)

diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
index 13d880b..1843755 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
@@ -18,11 +18,10 @@
 
 package org.apache.storm.perf.queuetest;
 
-import java.util.concurrent.locks.LockSupport;
+import java.util.Collections;
 import org.apache.storm.metrics2.StormMetricRegistry;
 import org.apache.storm.policy.WaitStrategyPark;
 import org.apache.storm.utils.JCQueue;
-import org.apache.storm.utils.MutableLong;
 
 @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
 public class JCQueuePerfTest {
@@ -48,8 +47,8 @@ public class JCQueuePerfTest {
     private static void ackingProducerSimulation() {
         WaitStrategyPark ws = new WaitStrategyPark(100);
         StormMetricRegistry registry = new StormMetricRegistry();
-        JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws, "test", "test", 1000, 1000, registry);
-        JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws, "test", "test", 1000, 1000, registry);
+        JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+        JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
 
         final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ);
         final Acker acker = new Acker(ackQ, spoutQ);
@@ -60,8 +59,9 @@ public class JCQueuePerfTest {
     private static void producerFwdConsumer(int prodBatchSz) {
         WaitStrategyPark ws = new WaitStrategyPark(100);
         StormMetricRegistry registry = new StormMetricRegistry();
-        JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000, registry);
-        JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000, registry);
+        JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test",
+                Collections.singletonList(1000), 1000, registry);
+        JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
 
         final Producer prod = new Producer(q1);
         final Forwarder fwd = new Forwarder(q1, q2);
@@ -72,8 +72,8 @@ public class JCQueuePerfTest {
 
 
     private static void oneProducer1Consumer(int prodBatchSz) {
-        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", 1000, 1000,
-            new StormMetricRegistry());
+        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
+                Collections.singletonList(1000), 1000, new StormMetricRegistry());
 
         final Producer prod1 = new Producer(q1);
         final Consumer cons1 = new Consumer(q1);
@@ -82,8 +82,8 @@ public class JCQueuePerfTest {
     }
 
     private static void twoProducer1Consumer(int prodBatchSz) {
-        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", 1000, 1000,
-            new StormMetricRegistry());
+        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
+                Collections.singletonList(1000), 1000, new StormMetricRegistry());
 
         final Producer prod1 = new Producer(q1);
         final Producer prod2 = new Producer(q1);
@@ -93,8 +93,8 @@ public class JCQueuePerfTest {
     }
 
     private static void threeProducer1Consumer(int prodBatchSz) {
-        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", 1000, 1000,
-            new StormMetricRegistry());
+        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
+                Collections.singletonList(1000), 1000, new StormMetricRegistry());
 
         final Producer prod1 = new Producer(q1);
         final Producer prod2 = new Producer(q1);
@@ -108,8 +108,8 @@ public class JCQueuePerfTest {
     private static void oneProducer2Consumers(int prodBatchSz) {
         WaitStrategyPark ws = new WaitStrategyPark(100);
         StormMetricRegistry registry = new StormMetricRegistry();
-        JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000, registry);
-        JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000, registry);
+        JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+        JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
 
         final Producer2 prod1 = new Producer2(q1, q2);
         final Consumer cons1 = new Consumer(q1);
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
index 2d72e78..72d660f 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
@@ -49,14 +49,6 @@ public class BuiltinMetricsUtil {
         registerMetric("__send-iconnection", metric, topoConf, context);
     }
 
-    public static void registerQueueMetrics(Map<String, JCQueue> queues, Map<String, Object> topoConf, TopologyContext context) {
-        for (Map.Entry<String, JCQueue> entry : queues.entrySet()) {
-            String name = "__" + entry.getKey();
-            IMetric metric = new StateMetric(entry.getValue());
-            registerMetric(name, metric, topoConf, context);
-        }
-    }
-
     public static void registerMetric(String name, IMetric metric, Map<String, Object> topoConf, TopologyContext context) {
         int bucketSize = ((Number) topoConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)).intValue();
         context.registerMetric(name, metric, bucketSize);
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index eaab4e9..70840ac 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -240,11 +240,6 @@ public class WorkerState {
         this.receiver = this.mqContext.bind(topologyId, port, cb, newConnectionResponse);
     }
 
-    private static double getQueueLoad(JCQueue queue) {
-        JCQueue.QueueMetrics queueMetrics = queue.getMetrics();
-        return ((double) queueMetrics.population()) / queueMetrics.capacity();
-    }
-
     public static boolean isConnectionReady(IConnection connection) {
         return !(connection instanceof ConnectionWithStatus)
                || ((ConnectionWithStatus) connection).status() == ConnectionWithStatus.Status.Ready;
@@ -475,7 +470,7 @@ public class WorkerState {
         Set<Integer> remoteTasks = Sets.difference(new HashSet<>(outboundTasks), new HashSet<>(localTaskIds));
         Map<Integer, Double> localLoad = new HashMap<>();
         for (IRunningExecutor exec : execs) {
-            double receiveLoad = getQueueLoad(exec.getReceiveQueue());
+            double receiveLoad = exec.getReceiveQueue().getQueueLoad();
             localLoad.put(exec.getExecutorId().get(0).intValue(), receiveLoad);
         }
 
@@ -683,10 +678,10 @@ public class WorkerState {
         IWaitStrategy backPressureWaitStrategy = IWaitStrategy.createBackPressureWaitStrategy(topologyConf);
         Map<List<Long>, JCQueue> receiveQueueMap = new HashMap<>();
         for (List<Long> executor : executors) {
-            int port = this.getPort();
+            List<Integer> taskIds = StormCommon.executorIdToTasks(executor);
             receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(),
                                                       recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy,
-                this.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, -1, this.getPort(), metricRegistry));
+                this.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, taskIds, this.getPort(), metricRegistry));
 
         }
         return receiveQueueMap;
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
index 88f5921..916850e 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
@@ -18,6 +18,7 @@
 
 package org.apache.storm.daemon.worker;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -66,7 +67,7 @@ class WorkerTransfer implements JCQueue.Consumer {
         }
 
         this.transferQueue = new JCQueue("worker-transfer-queue", xferQueueSz, 0, xferBatchSz, backPressureWaitStrategy,
-            workerState.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, -1, workerState.getPort(),
+            workerState.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, Collections.singletonList(-1), workerState.getPort(),
             workerState.getMetricRegistry());
     }
 
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index dd62e6a..68ea9da 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -122,9 +122,6 @@ public class BoltExecutor extends Executor {
                 ((ICredentialsListener) boltObject).setCredentials(credentials);
             }
             if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) {
-                Map<String, JCQueue> map = ImmutableMap.of("receive", receiveQueue, "transfer", workerData.getTransferQueue());
-                BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, userContext);
-
                 Map<NodeInfo, IConnection> cachedNodePortToSocket = workerData.getCachedNodeToPortSocket().get();
                 BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, topoConf, userContext);
                 BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.getReceiver(), topoConf, userContext);
@@ -138,9 +135,6 @@ public class BoltExecutor extends Executor {
                         }
                     }
                 }
-            } else {
-                Map<String, JCQueue> map = ImmutableMap.of("receive", receiveQueue);
-                BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, userContext);
             }
 
             this.outputCollector = new BoltOutputCollectorImpl(this, taskData, rand, hasEventLoggers, ackingEnabled, isDebug);
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index cb2af7f..6f4d7dd 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -140,8 +140,6 @@ public class SpoutExecutor extends Executor {
             this.outputCollectors.add(outputCollector);
 
             builtInMetrics.registerAll(topoConf, taskData.getUserContext());
-            Map<String, JCQueue> map = ImmutableMap.of("receive", receiveQueue);
-            BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, taskData.getUserContext());
 
             if (spoutObject instanceof ICredentialsListener) {
                 ((ICredentialsListener) spoutObject).setCredentials(credentials);
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/JcMetrics.java b/storm-client/src/jvm/org/apache/storm/metrics2/JcMetrics.java
deleted file mode 100644
index 82b9d13..0000000
--- a/storm-client/src/jvm/org/apache/storm/metrics2/JcMetrics.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.metrics2;
-
-import org.apache.storm.utils.JCQueue;
-
-public class JcMetrics {
-    private final SimpleGauge<Long> capacity;
-    private final SimpleGauge<Long> population;
-
-    JcMetrics(SimpleGauge<Long> capacity,
-              SimpleGauge<Long> population) {
-        this.capacity = capacity;
-        this.population = population;
-    }
-
-    public void setCapacity(Long capacity) {
-        this.capacity.set(capacity);
-    }
-
-    public void setPopulation(Long population) {
-        this.population.set(population);
-    }
-
-    public void set(JCQueue.QueueMetrics metrics) {
-        this.capacity.set(metrics.capacity());
-        this.population.set(metrics.population());
-    }
-}
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 3401706..4d62cf4 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -67,10 +67,11 @@ public class StormMetricRegistry {
         return gauge;
     }
 
-    public JcMetrics jcMetrics(String name, String topologyId, String componentId, Integer taskId, Integer port) {
-        SimpleGauge<Long> capacityGauge = gauge(0L, name + "-capacity", topologyId, componentId, taskId, port);
-        SimpleGauge<Long> populationGauge = gauge(0L, name + "-population", topologyId, componentId, taskId, port);
-        return new JcMetrics(capacityGauge, populationGauge);
+    public <T> Gauge<T> gauge(String name, Gauge<T> gauge, String topologyId, String componentId, Integer taskId, Integer port) {
+        MetricNames metricNames = workerMetricName(name, topologyId, componentId, taskId, port);
+        gauge = registry.register(metricNames.getLongName(), gauge);
+        saveMetricTaskIdMapping(taskId, metricNames, gauge, taskIdGauges);
+        return gauge;
     }
 
     public Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId) {
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
index b0f2d9f..4a40f82 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
@@ -20,17 +20,9 @@ package org.apache.storm.utils;
 
 import java.io.Closeable;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.storm.metric.api.IStatefulObject;
-import org.apache.storm.metric.internal.RateTracker;
-import org.apache.storm.metrics2.JcMetrics;
+import java.util.List;
 import org.apache.storm.metrics2.StormMetricRegistry;
 import org.apache.storm.policy.IWaitStrategy;
-import org.apache.storm.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.storm.shade.org.jctools.queues.MessagePassingQueue;
 import org.apache.storm.shade.org.jctools.queues.MpscArrayQueue;
 import org.apache.storm.shade.org.jctools.queues.MpscUnboundedArrayQueue;
@@ -38,17 +30,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
-public class JCQueue implements IStatefulObject, Closeable {
+public class JCQueue implements Closeable {
     private static final Logger LOG = LoggerFactory.getLogger(JCQueue.class);
-    private static final String PREFIX = "jc-";
-    private static final ScheduledThreadPoolExecutor METRICS_REPORTER_EXECUTOR =
-        new ScheduledThreadPoolExecutor(1,
-            new ThreadFactoryBuilder()
-                .setDaemon(true)
-                .setNameFormat(PREFIX + "metrics-reporter")
-                .build());
     private final ExitCondition continueRunning = () -> true;
-    private final JcMetrics jcMetrics;
+    private final List<JCQueueMetrics> jcqMetrics = new ArrayList<>();
     private final MpscArrayQueue<Object> recvQueue;
     // only holds msgs from other workers (via WorkerTransfer), when recvQueue is full
     private final MpscUnboundedArrayQueue<Object> overflowQ;
@@ -56,32 +41,24 @@ public class JCQueue implements IStatefulObject, Closeable {
     private final int producerBatchSz;
     private final DirectInserter directInserter = new DirectInserter(this);
     private final ThreadLocal<BatchInserter> thdLocalBatcher = new ThreadLocal<BatchInserter>(); // ensure 1 instance per producer thd.
-    private final JCQueue.QueueMetrics metrics;
     private final IWaitStrategy backPressureWaitStrategy;
     private final String queueName;
 
     public JCQueue(String queueName, int size, int overflowLimit, int producerBatchSz, IWaitStrategy backPressureWaitStrategy,
-                   String topologyId, String componentId, Integer taskId, int port, StormMetricRegistry metricRegistry) {
+                   String topologyId, String componentId, List<Integer> taskIds, int port, StormMetricRegistry metricRegistry) {
         this.queueName = queueName;
         this.overflowLimit = overflowLimit;
         this.recvQueue = new MpscArrayQueue<>(size);
         this.overflowQ = new MpscUnboundedArrayQueue<>(size);
 
-        this.metrics = new JCQueue.QueueMetrics();
-        this.jcMetrics = metricRegistry.jcMetrics(queueName, topologyId, componentId, taskId, port);
+        for (Integer taskId : taskIds) {
+            this.jcqMetrics.add(new JCQueueMetrics(queueName, topologyId, componentId, taskId, port,
+                    metricRegistry, recvQueue, overflowQ));
+        }
 
         //The batch size can be no larger than half the full recvQueue size, to avoid contention issues.
         this.producerBatchSz = Math.max(1, Math.min(producerBatchSz, size / 2));
         this.backPressureWaitStrategy = backPressureWaitStrategy;
-
-        if (!METRICS_REPORTER_EXECUTOR.isShutdown()) {
-            METRICS_REPORTER_EXECUTOR.scheduleAtFixedRate(new Runnable() {
-                @Override
-                public void run() {
-                    jcMetrics.set(metrics);
-                }
-            }, 15, 15, TimeUnit.SECONDS);
-        }
     }
 
     public String getName() {
@@ -90,9 +67,9 @@ public class JCQueue implements IStatefulObject, Closeable {
 
     @Override
     public void close() {
-        //No need to block, the task run by the executor is safe to run even after metrics are closed
-        METRICS_REPORTER_EXECUTOR.shutdown();
-        metrics.close();
+        for (JCQueueMetrics jcQueueMetric : jcqMetrics) {
+            jcQueueMetric.close();
+        }
     }
 
     /**
@@ -118,6 +95,10 @@ public class JCQueue implements IStatefulObject, Closeable {
         return recvQueue.size() + overflowQ.size();
     }
 
+    public double getQueueLoad() {
+        return ((double) recvQueue.size()) / recvQueue.capacity();
+    }
+
     /**
      * Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q.
      */
@@ -149,7 +130,9 @@ public class JCQueue implements IStatefulObject, Closeable {
     // Non Blocking. returns true/false indicating success/failure. Fails if full.
     private boolean tryPublishInternal(Object obj) {
         if (recvQueue.offer(obj)) {
-            metrics.notifyArrivals(1);
+            for (JCQueueMetrics jcQueueMetric : jcqMetrics) {
+                jcQueueMetric.notifyArrivals(1);
+            }
             return true;
         }
         return false;
@@ -167,7 +150,9 @@ public class JCQueue implements IStatefulObject, Closeable {
                 }
             };
         int count = recvQueue.fill(supplier, objs.size());
-        metrics.notifyArrivals(count);
+        for (JCQueueMetrics jcQueueMetric : jcqMetrics) {
+            jcQueueMetric.notifyArrivals(count);
+        }
         return count;
     }
 
@@ -221,7 +206,9 @@ public class JCQueue implements IStatefulObject, Closeable {
     }
 
     public void recordMsgDrop() {
-        getMetrics().notifyDroppedMsg();
+        for (JCQueueMetrics jcQueueMetric : jcqMetrics) {
+            jcQueueMetric.notifyDroppedMsg();
+        }
     }
 
     public boolean isEmptyOverflow() {
@@ -254,16 +241,6 @@ public class JCQueue implements IStatefulObject, Closeable {
         return inserter.tryFlush();
     }
 
-    @Override
-    public Object getState() {
-        return metrics.getState();
-    }
-
-    //This method enables the metrics to be accessed from outside of the JCQueue class
-    public JCQueue.QueueMetrics getMetrics() {
-        return metrics;
-    }
-
     private interface Inserter {
         // blocking call that can be interrupted using Thread.interrupt()
         void publish(Object obj) throws InterruptedException;
@@ -301,7 +278,9 @@ public class JCQueue implements IStatefulObject, Closeable {
             boolean inserted = queue.tryPublishInternal(obj);
             int idleCount = 0;
             while (!inserted) {
-                queue.metrics.notifyInsertFailure();
+                for (JCQueueMetrics jcQueueMetric : queue.jcqMetrics) {
+                    jcQueueMetric.notifyInsertFailure();
+                }
                 if (idleCount == 0) { // check avoids multiple log msgs when in a idle loop
                     LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait", queue.getName());
                 }
@@ -322,7 +301,9 @@ public class JCQueue implements IStatefulObject, Closeable {
         public boolean tryPublish(Object obj) {
             boolean inserted = queue.tryPublishInternal(obj);
             if (!inserted) {
-                queue.metrics.notifyInsertFailure();
+                for (JCQueueMetrics jcQueueMetric : queue.jcqMetrics) {
+                    jcQueueMetric.notifyInsertFailure();
+                }
                 return false;
             }
             return true;
@@ -387,7 +368,9 @@ public class JCQueue implements IStatefulObject, Closeable {
             int publishCount = queue.tryPublishInternal(currentBatch);
             int retryCount = 0;
             while (publishCount == 0) { // retry till at least 1 element is drained
-                queue.metrics.notifyInsertFailure();
+                for (JCQueueMetrics jcQueueMetric : queue.jcqMetrics) {
+                    jcQueueMetric.notifyInsertFailure();
+                }
                 if (retryCount == 0) { // check avoids multiple log msgs when in a idle loop
                     LOG.debug("Experiencing Back Pressure when flushing batch to Q: {}. Entering BackPressure Wait.", queue.getName());
                 }
@@ -411,7 +394,9 @@ public class JCQueue implements IStatefulObject, Closeable {
             }
             int publishCount = queue.tryPublishInternal(currentBatch);
             if (publishCount == 0) {
-                queue.metrics.notifyInsertFailure();
+                for (JCQueueMetrics jcQueueMetric : queue.jcqMetrics) {
+                    jcQueueMetric.notifyInsertFailure();
+                }
                 return false;
             } else {
                 currentBatch.subList(0, publishCount).clear();
@@ -419,67 +404,4 @@ public class JCQueue implements IStatefulObject, Closeable {
             }
         }
     } // class BatchInserter
-
-    /**
-     * This inner class provides methods to access the metrics of the JCQueue.
-     */
-    public class QueueMetrics implements Closeable {
-        private final RateTracker arrivalsTracker = new RateTracker(10000, 10);
-        private final RateTracker insertFailuresTracker = new RateTracker(10000, 10);
-        private final AtomicLong droppedMessages = new AtomicLong(0);
-
-        public long population() {
-            return recvQueue.size();
-        }
-
-        public long capacity() {
-            return recvQueue.capacity();
-        }
-
-        public Object getState() {
-            Map<String, Object> state = new HashMap<>();
-
-            final double arrivalRateInSecs = arrivalsTracker.reportRate();
-
-            long tuplePop = population();
-
-            // Assume the recvQueue is stable, in which the arrival rate is equal to the consumption rate.
-            // If this assumption does not hold, the calculation of sojourn time should also consider
-            // departure rate according to Queuing Theory.
-            final double sojournTime = tuplePop / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
-
-            long cap = capacity();
-            float pctFull = (1.0F * tuplePop / cap);
-
-            state.put("capacity", cap);
-            state.put("pct_full", pctFull);
-            state.put("population", tuplePop);
-
-            state.put("arrival_rate_secs", arrivalRateInSecs);
-            state.put("sojourn_time_ms", sojournTime); //element sojourn time in milliseconds
-            state.put("insert_failures", insertFailuresTracker.reportRate());
-            state.put("dropped_messages", droppedMessages);
-            state.put("overflow", overflowQ.size());
-            return state;
-        }
-
-        public void notifyArrivals(long counts) {
-            arrivalsTracker.notify(counts);
-        }
-
-        public void notifyInsertFailure() {
-            insertFailuresTracker.notify(1);
-        }
-
-        public void notifyDroppedMsg() {
-            droppedMessages.incrementAndGet();
-        }
-
-        @Override
-        public void close() {
-            arrivalsTracker.close();
-            insertFailuresTracker.close();
-        }
-
-    }
 }
\ No newline at end of file
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java
new file mode 100644
index 0000000..eec5b33
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.utils;
+
+import com.codahale.metrics.Gauge;
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.storm.metric.internal.RateTracker;
+import org.apache.storm.metrics2.StormMetricRegistry;
+import org.apache.storm.shade.org.jctools.queues.MpscArrayQueue;
+import org.apache.storm.shade.org.jctools.queues.MpscUnboundedArrayQueue;
+
+@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
+public class JCQueueMetrics implements Closeable {
+    private final RateTracker arrivalsTracker = new RateTracker(10000, 10);
+    private final RateTracker insertFailuresTracker = new RateTracker(10000, 10);
+    private final AtomicLong droppedMessages = new AtomicLong(0);
+
+    public JCQueueMetrics(String queueName, String topologyId, String componentId, int taskId, int port,
+                          StormMetricRegistry metricRegistry, MpscArrayQueue<Object> receiveQ,
+                          MpscUnboundedArrayQueue<Object> overflowQ) {
+
+        Gauge<Integer> cap = new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return receiveQ.capacity();
+            }
+        };
+
+        Gauge<Float> pctFull = new Gauge<Float>() {
+            @Override
+            public Float getValue() {
+                return (1.0F * receiveQ.size() / receiveQ.capacity());
+            }
+        };
+
+        Gauge<Integer> pop = new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return receiveQ.size();
+            }
+        };
+
+        Gauge<Double> arrivalRate = new Gauge<Double>() {
+            @Override
+            public Double getValue() {
+                return arrivalsTracker.reportRate();
+            }
+        };
+
+        Gauge<Double> sojourn = new Gauge<Double>() {
+            @Override
+            public Double getValue() {
+                // Assume the recvQueue is stable, in which the arrival rate is equal to the consumption rate.
+                // If this assumption does not hold, the calculation of sojourn time should also consider
+                // departure rate according to Queuing Theory.
+                return receiveQ.size() / Math.max(arrivalsTracker.reportRate(), 0.00001) * 1000.0;
+            }
+        };
+
+        Gauge<Double> insertFailures = new Gauge<Double>() {
+            @Override
+            public Double getValue() {
+                return insertFailuresTracker.reportRate();
+            }
+        };
+
+        Gauge<Long> dropped = new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return droppedMessages.get();
+            }
+        };
+
+        Gauge<Integer> overflow = new Gauge<Integer>() {
+            @Override
+            public Integer getValue() {
+                return overflowQ.size();
+            }
+        };
+
+        metricRegistry.gauge(queueName + "-capacity", cap, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(queueName + "-pct_full", pctFull, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(queueName + "-population", pop, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(queueName + "-arrival_rate_secs", arrivalRate, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(queueName + "-sojourn_time_ms", sojourn, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(queueName + "-insert_failures", insertFailures, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(queueName + "-dropped_messages", dropped, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(queueName + "-overflow", overflow, topologyId, componentId, taskId, port);
+    }
+
+    public void notifyArrivals(long counts) {
+        arrivalsTracker.notify(counts);
+    }
+
+    public void notifyInsertFailure() {
+        insertFailuresTracker.notify(1);
+    }
+
+    public void notifyDroppedMsg() {
+        droppedMessages.incrementAndGet();
+    }
+
+    @Override
+    public void close() {
+        arrivalsTracker.close();
+        insertFailuresTracker.close();
+    }
+}
diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
index 2cfe25d..07a9123 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
@@ -12,17 +12,17 @@
 
 package org.apache.storm.utils;
 
+import java.util.Collections;
 import org.apache.storm.metrics2.StormMetricRegistry;
 import org.apache.storm.policy.WaitStrategyPark;
 import org.apache.storm.utils.JCQueue.Consumer;
 import org.junit.Assert;
 import org.junit.Test;
 
-
 public class JCQueueBackpressureTest {
     
     private static JCQueue createQueue(String name, int queueSize) {
-        return new JCQueue(name, queueSize, 0, 1, new WaitStrategyPark(0), "test", "test", 1000, 1000, new StormMetricRegistry());
+        return new JCQueue(name, queueSize, 0, 1, new WaitStrategyPark(0), "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
     }
 
     @Test
diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
index 0d597fc..4a98937 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
@@ -14,6 +14,7 @@ package org.apache.storm.utils;
 import static org.junit.Assert.assertFalse;
 
 import java.time.Duration;
+import java.util.Collections;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.storm.metrics2.StormMetricRegistry;
@@ -21,7 +22,6 @@ import org.apache.storm.policy.IWaitStrategy;
 import org.apache.storm.policy.WaitStrategyPark;
 import org.junit.Assert;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 
 public class JCQueueTest {
@@ -157,7 +157,7 @@ public class JCQueueTest {
     }
 
     private JCQueue createQueue(String name, int batchSize, int queueSize) {
-        return new JCQueue(name, queueSize, 0, batchSize, waitStrategy, "test", "test", 1000, 1000, new StormMetricRegistry());
+        return new JCQueue(name, queueSize, 0, batchSize, waitStrategy, "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
     }
 
     private static class IncProducer implements Runnable {