You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/07/15 20:36:05 UTC

[activemq-artemis] branch master updated: ARTEMIS-2322: Expose Queue.getRate() data as JMX metric

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new dbb3a90  ARTEMIS-2322: Expose Queue.getRate() data as JMX metric
     new 6896e84  This closes #2751
dbb3a90 is described below

commit dbb3a90fe6d2718fef0b8ae75123519b1404ef1f
Author: Dmitry Volodin <dm...@gmail.com>
AuthorDate: Mon Jul 15 18:02:03 2019 +0300

    ARTEMIS-2322: Expose Queue.getRate() data as JMX metric
---
 .../apache/activemq/artemis/logs/AuditLogger.java  |  7 +++++++
 .../artemis/api/core/management/QueueControl.java  |  6 ++++++
 .../core/management/impl/QueueControlImpl.java     | 23 ++++++++++++++++------
 .../management/QueueControlUsingCoreTest.java      |  5 +++++
 4 files changed, 35 insertions(+), 6 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
index a07608e..e76b3c9 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
@@ -2242,6 +2242,13 @@ public interface AuditLogger extends BasicLogger {
    @Message(id = 601267, value = "User {0} is creating a core session on target resource {1} {2}", format = Message.Format.MESSAGE_FORMAT)
    void createCoreSession(String user, Object source, Object... args);
 
+   static void getProducedRate(Object source) {
+      LOGGER.getMessageCount(getCaller(), source);
+   }
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 601268, value = "User {0} is getting produced message rate on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
+   void getProducedRate(String user, Object source, Object... args);
 
    //hot path log using a different logger
    static void coreSendMessage(Object source, String user, Object... args) {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index 2443d27..bc4f380 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -100,6 +100,12 @@ public interface QueueControl {
    long getMessageCount();
 
    /**
+    * Returns the rate of writing messages to the queue.
+    */
+   @Attribute(desc = "rate of writing messages to the queue currently (based on default window function)")
+   float getProducedRate();
+
+   /**
     * Returns the persistent size of all messages currently in this queue. The persistent size of a message
     * is the amount of space the message would take up on disk which is used to track how much data there
     * is to consume on this queue
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index b789347..4f18921 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -250,6 +250,17 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
    }
 
    @Override
+   public float getProducedRate() {
+      if (AuditLogger.isEnabled()) {
+         AuditLogger.getProducedRate(queue);
+      }
+      checkStarted();
+
+      // This is an attribute, no need to blockOnIO
+      return queue.getRate();
+   }
+
+   @Override
    public long getPersistentSize() {
       if (AuditLogger.isEnabled()) {
          AuditLogger.getPersistentSize(queue);
@@ -842,7 +853,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
          AuditLogger.countMessages(queue, filterStr);
       }
 
-      Long value = intenalCountMessages(filterStr, null).get(null);
+      Long value = internalCountMessages(filterStr, null).get(null);
       return value == null ? 0 : value;
    }
 
@@ -852,10 +863,10 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
          AuditLogger.countMessages(queue, filterStr, groupByProperty);
       }
 
-      return JsonUtil.toJsonObject(intenalCountMessages(filterStr, groupByProperty)).toString();
+      return JsonUtil.toJsonObject(internalCountMessages(filterStr, groupByProperty)).toString();
    }
 
-   private Map<String, Long> intenalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
+   private Map<String, Long> internalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
       checkStarted();
 
       clearIO();
@@ -890,7 +901,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
          AuditLogger.countDeliveringMessages(queue, filterStr);
       }
 
-      Long value = intenalCountDeliveryMessages(filterStr, null).get(null);
+      Long value = internalCountDeliveryMessages(filterStr, null).get(null);
       return value == null ? 0 : value;
    }
 
@@ -900,10 +911,10 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
          AuditLogger.countDeliveringMessages(queue, filterStr, groupByProperty);
       }
 
-      return JsonUtil.toJsonObject(intenalCountDeliveryMessages(filterStr, groupByProperty)).toString();
+      return JsonUtil.toJsonObject(internalCountDeliveryMessages(filterStr, groupByProperty)).toString();
    }
 
-   private Map<String, Long> intenalCountDeliveryMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
+   private Map<String, Long> internalCountDeliveryMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
       checkStarted();
 
       clearIO();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index 254a287..14f55c9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -207,6 +207,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
          }
 
          @Override
+         public float getProducedRate() {
+            return (Long) proxy.retrieveAttributeValue("producedRate", Long.class);
+         }
+
+         @Override
          public long getMessagesAdded() {
             return (Integer) proxy.retrieveAttributeValue("messagesAdded", Integer.class);
          }