You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2018/10/27 05:52:55 UTC

[2/2] activemq-artemis git commit: ARTEMIS-2150 Counts the number of delivering messages in this queue

ARTEMIS-2150 Counts the number of delivering messages in this queue


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dba405c4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dba405c4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dba405c4

Branch: refs/heads/master
Commit: dba405c404bc4aeaff4ab960a75036e9520fe3f1
Parents: 3f3046c
Author: Arthur Fritz Santiago <ar...@db1.com.br>
Authored: Fri Oct 26 15:32:34 2018 -0300
Committer: Michael Andre Pearce <mi...@me.com>
Committed: Sat Oct 27 06:52:56 2018 +0100

----------------------------------------------------------------------
 .../api/core/management/QueueControl.java       |  17 ++
 .../core/management/impl/QueueControlImpl.java  |  61 ++++++-
 .../management/QueueControlTest.java            | 183 +++++++++++++++++++
 .../management/QueueControlUsingCoreTest.java   |  10 +
 4 files changed, 261 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dba405c4/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index b210530..b57be21 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -312,6 +312,23 @@ public interface QueueControl {
    String countMessages(@Parameter(name = "filter", desc = "This filter separate account messages") String filter, @Parameter(name = "groupByProperty", desc = "This property to group by") String groupByProperty) throws Exception;
 
    /**
+    * Counts the number of delivering messages in this queue matching the specified filter.
+    * <br>
+    * Using {@code null} or an empty filter will count <em>all</em> messages from this queue.
+    */
+   @Operation(desc = "Returns the number of the messages in the queue matching the given filter")
+   long countDeliveringMessages(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter) throws Exception;
+
+   /**
+    * Counts the number of delivering messages in this queue matching the specified filter, grouped by the given property field.
+    * In case of null property will be grouped in "null"
+    * <br>
+    * Using {@code null} or an empty filter will count <em>all</em> messages from this queue.
+    */
+   @Operation(desc = "Returns the number of the messages in the queue matching the given filter, grouped by the given property field")
+   String countDeliveringMessages(@Parameter(name = "filter", desc = "This filter separate account messages") String filter, @Parameter(name = "groupByProperty", desc = "This property to group by") String groupByProperty) throws Exception;
+
+   /**
     * Removes the message corresponding to the specified message ID.
     *
     * @return {@code true} if the message was removed, {@code false} else

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dba405c4/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 3db5cae..d62978f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -706,9 +706,10 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
 
    private Map<String, Long> intenalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
       checkStarted();
+
       clearIO();
-      Map<String, Long> result = new HashMap<>();
 
+      Map<String, Long> result = new HashMap<>();
       try {
          Filter filter = FilterImpl.createFilter(filterStr);
          SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
@@ -719,15 +720,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
                try {
                   while (iterator.hasNext()) {
                      Message message = iterator.next().getMessage();
-                     if (filter == null || filter.match(message)) {
-                        if (groupByProperty == null) {
-                           result.compute(null, (k,v) -> v == null ? 1 : ++v);
-                        } else {
-                           Object value = message.getObjectProperty(groupByProperty);
-                           String valueStr = value == null ? null : value.toString();
-                           result.compute(valueStr, (k,v) -> v == null ? 1 : ++v);
-                        }
-                     }
+                     internalComputeMessage(result, filter, groupByProperty, message);
                   }
                } catch (NoSuchElementException ignored) {
                   // this could happen through paging browsing
@@ -741,6 +734,54 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
    }
 
    @Override
+   public long countDeliveringMessages(final String filterStr) throws Exception {
+      Long value = intenalCountDeliveryMessages(filterStr, null).get(null);
+      return value == null ? 0 : value;
+   }
+
+   @Override
+   public String countDeliveringMessages(final String filterStr, final String groupByProperty) throws Exception {
+      return JsonUtil.toJsonObject(intenalCountDeliveryMessages(filterStr, groupByProperty)).toString();
+   }
+
+   private Map<String, Long> intenalCountDeliveryMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
+      checkStarted();
+
+      clearIO();
+
+      Map<String, Long> result = new HashMap<>();
+      try {
+         Filter filter = FilterImpl.createFilter(filterStr);
+         SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
+         if (filter == null && groupByProperty == null) {
+            result.put(null, Long.valueOf(getDeliveringCount()));
+         } else {
+            Map<String, List<MessageReference>> deliveringMessages = queue.getDeliveringMessages();
+            deliveringMessages.forEach((s, messageReferenceList) ->
+                            messageReferenceList.forEach(messageReference ->
+                                    internalComputeMessage(result, filter, groupByProperty, messageReference.getMessage())
+                            ));
+         }
+         return result;
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   private void internalComputeMessage(Map<String, Long> result, Filter filter, SimpleString groupByProperty, Message message) {
+      if (filter == null || filter.match(message)) {
+         if (groupByProperty == null) {
+            result.compute(null, (k, v) -> v == null ? 1 : ++v);
+         } else {
+            Object value = message.getObjectProperty(groupByProperty);
+            String valueStr = value == null ? null : value.toString();
+            result.compute(valueStr, (k, v) -> v == null ? 1 : ++v);
+         }
+      }
+   }
+
+
+   @Override
    public boolean removeMessage(final long messageID) throws Exception {
       checkStarted();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dba405c4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 5ba0b39..d64eba6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -1704,6 +1704,189 @@ public class QueueControlTest extends ManagementTestBase {
    }
 
    @Test
+   public void testCountDeliveringMessageCountWithFilter() throws Exception {
+      SimpleString key = new SimpleString("key");
+      long matchingValue = RandomUtil.randomPositiveLong();
+      long unmatchingValue = matchingValue + 1;
+
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, queue, null, false);
+      ClientProducer producer = session.createProducer(address);
+
+      ClientMessage matchingMessage = session.createMessage(false);
+      matchingMessage.putLongProperty(key, matchingValue);
+      ClientMessage unmatchingMessage = session.createMessage(false);
+      unmatchingMessage.putLongProperty(key, unmatchingValue);
+      producer.send(matchingMessage);
+      producer.send(unmatchingMessage);
+      producer.send(matchingMessage);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+      Assert.assertEquals(0, queueControl.countDeliveringMessages(null));
+      Assert.assertEquals(0, queueControl.countDeliveringMessages(key + " =" + matchingValue));
+      Assert.assertEquals(0, queueControl.countDeliveringMessages(key + " =" + unmatchingValue));
+
+      ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
+      ClientMessage message = consumer.receive(500);
+      Assert.assertNotNull(message);
+      Assert.assertEquals(3, queueControl.countDeliveringMessages(null));
+      Assert.assertEquals(2, queueControl.countDeliveringMessages(key + " =" + matchingValue));
+      Assert.assertEquals(1, queueControl.countDeliveringMessages(key + " =" + unmatchingValue));
+
+      consumer.close();
+      session.deleteQueue(queue);
+   }
+
+   @Test
+   public void testCountDeliveringMessageCountNoFilter() throws Exception {
+      SimpleString key = new SimpleString("key");
+      long matchingValue = RandomUtil.randomLong();
+      long unmatchingValue = matchingValue + 1;
+
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, queue, null, false);
+      ClientProducer producer = session.createProducer(address);
+
+      ClientMessage matchingMessage = session.createMessage(false);
+      matchingMessage.putLongProperty(key, matchingValue);
+      ClientMessage unmatchingMessage = session.createMessage(false);
+      unmatchingMessage.putLongProperty(key, unmatchingValue);
+      producer.send(matchingMessage);
+      producer.send(unmatchingMessage);
+      producer.send(matchingMessage);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+      Assert.assertEquals(0, queueControl.countDeliveringMessages(null));
+
+      ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
+      ClientMessage message = consumer.receive(500);
+      Assert.assertNotNull(message);
+      Assert.assertEquals(3, queueControl.countDeliveringMessages(null));
+
+      consumer.close();
+      session.deleteQueue(queue);
+   }
+
+   @Test
+   public void testCountDeliveringMessageCountNoGroupNoFilter() throws Exception {
+      SimpleString key = new SimpleString("key");
+      long matchingValue = RandomUtil.randomLong();
+      long unmatchingValue = matchingValue + 1;
+
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, queue, null, false);
+      ClientProducer producer = session.createProducer(address);
+
+      ClientMessage matchingMessage = session.createMessage(false);
+      matchingMessage.putLongProperty(key, matchingValue);
+      ClientMessage unmatchingMessage = session.createMessage(false);
+      unmatchingMessage.putLongProperty(key, unmatchingValue);
+      producer.send(matchingMessage);
+      producer.send(unmatchingMessage);
+      producer.send(matchingMessage);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+      String result = queueControl.countDeliveringMessages(null, null);
+      JsonObject jsonObject = JsonUtil.readJsonObject(result);
+      Assert.assertEquals(0, jsonObject.getInt("null"));
+
+      ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
+      ClientMessage message = consumer.receive(500);
+      Assert.assertNotNull(message);
+
+      result = queueControl.countDeliveringMessages(null, null);
+      jsonObject = JsonUtil.readJsonObject(result);
+      Assert.assertEquals(3, jsonObject.getInt("null"));
+
+      consumer.close();
+      session.deleteQueue(queue);
+   }
+
+   @Test
+   public void testCountDeliveringMessageCountGroupNoFilter() throws Exception {
+      String key = new String("key_group");
+      String valueGroup1 = "group_1";
+      String valueGroup2 = "group_2";
+
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, queue, null, false);
+      ClientProducer producer = session.createProducer(address);
+
+      ClientMessage message1 = session.createMessage(false);
+      message1.putStringProperty(key, valueGroup1);
+      ClientMessage message2 = session.createMessage(false);
+      message2.putStringProperty(key, valueGroup2);
+      producer.send(message1);
+      producer.send(message2);
+      producer.send(message1);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+      String result = queueControl.countDeliveringMessages(null, key);
+      JsonObject jsonObject = JsonUtil.readJsonObject(result);
+      Assert.assertTrue(jsonObject.isEmpty());
+
+      ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
+      ClientMessage message = consumer.receive(500);
+      Assert.assertNotNull(message);
+
+      result = queueControl.countDeliveringMessages(null, key);
+      jsonObject = JsonUtil.readJsonObject(result);
+      Assert.assertEquals(2, jsonObject.getInt(valueGroup1));
+      Assert.assertEquals(1, jsonObject.getInt(valueGroup2));
+      Assert.assertFalse(jsonObject.containsKey(null));
+
+      consumer.close();
+      session.deleteQueue(queue);
+   }
+
+   @Test
+   public void testCountDeliveringMessageCountGroupFilter() throws Exception {
+      String key = new String("key_group");
+      long valueGroup1 = RandomUtil.randomLong();
+      long valueGroup2 = valueGroup1 + 1;
+
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, queue, null, false);
+      ClientProducer producer = session.createProducer(address);
+
+      ClientMessage message1 = session.createMessage(false);
+      message1.putLongProperty(key, valueGroup1);
+      ClientMessage message2 = session.createMessage(false);
+      message2.putLongProperty(key, valueGroup2);
+      producer.send(message1);
+      producer.send(message2);
+      producer.send(message1);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+      String result = queueControl.countDeliveringMessages(key + " =" + valueGroup1, key);
+      JsonObject jsonObject = JsonUtil.readJsonObject(result);
+      Assert.assertTrue(jsonObject.isEmpty());
+
+      ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
+      ClientMessage message = consumer.receive(500);
+      Assert.assertNotNull(message);
+
+      result = queueControl.countDeliveringMessages(key + " =" + valueGroup1, key);
+      jsonObject = JsonUtil.readJsonObject(result);
+      Assert.assertEquals(2, jsonObject.getInt(String.valueOf(valueGroup1)));
+      Assert.assertFalse(jsonObject.containsKey(String.valueOf(valueGroup2)));
+      Assert.assertFalse(jsonObject.containsKey(null));
+
+      consumer.close();
+      session.deleteQueue(queue);
+   }
+
+   @Test
    public void testCountMessagesWithFilter() throws Exception {
       SimpleString key = new SimpleString("key");
       long matchingValue = RandomUtil.randomLong();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dba405c4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index 26ada03..1f2032a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -107,6 +107,16 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
          }
 
          @Override
+         public long countDeliveringMessages(final String filter) throws Exception {
+            return (Long) proxy.invokeOperation(Long.class, "countDeliveringMessages", filter);
+         }
+
+         @Override
+         public String countDeliveringMessages(final String filter, final String groupByFilter) throws Exception {
+            return (String) proxy.invokeOperation(String.class, "countDeliveringProperty", filter, groupByFilter);
+         }
+
+         @Override
          public boolean expireMessage(final long messageID) throws Exception {
             return (Boolean) proxy.invokeOperation("expireMessage", messageID);
          }