You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2018/10/27 05:52:54 UTC
[1/2] activemq-artemis git commit: This closes #2399
Repository: activemq-artemis
Updated Branches:
refs/heads/master 3f3046c5e -> a78ed9a2b
This closes #2399
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a78ed9a2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a78ed9a2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a78ed9a2
Branch: refs/heads/master
Commit: a78ed9a2b38e22c2ff4727094079ef3e2b7c013c
Parents: 3f3046c dba405c
Author: Michael Andre Pearce <mi...@me.com>
Authored: Sat Oct 27 06:52:56 2018 +0100
Committer: Michael Andre Pearce <mi...@me.com>
Committed: Sat Oct 27 06:52:56 2018 +0100
----------------------------------------------------------------------
.../api/core/management/QueueControl.java | 17 ++
.../core/management/impl/QueueControlImpl.java | 61 ++++++-
.../management/QueueControlTest.java | 183 +++++++++++++++++++
.../management/QueueControlUsingCoreTest.java | 10 +
4 files changed, 261 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
[2/2] activemq-artemis git commit: ARTEMIS-2150 Counts the number of
delivering messages in this queue
Posted by mi...@apache.org.
ARTEMIS-2150 Counts the number of delivering messages in this queue
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dba405c4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dba405c4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dba405c4
Branch: refs/heads/master
Commit: dba405c404bc4aeaff4ab960a75036e9520fe3f1
Parents: 3f3046c
Author: Arthur Fritz Santiago <ar...@db1.com.br>
Authored: Fri Oct 26 15:32:34 2018 -0300
Committer: Michael Andre Pearce <mi...@me.com>
Committed: Sat Oct 27 06:52:56 2018 +0100
----------------------------------------------------------------------
.../api/core/management/QueueControl.java | 17 ++
.../core/management/impl/QueueControlImpl.java | 61 ++++++-
.../management/QueueControlTest.java | 183 +++++++++++++++++++
.../management/QueueControlUsingCoreTest.java | 10 +
4 files changed, 261 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dba405c4/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index b210530..b57be21 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -312,6 +312,23 @@ public interface QueueControl {
String countMessages(@Parameter(name = "filter", desc = "This filter separate account messages") String filter, @Parameter(name = "groupByProperty", desc = "This property to group by") String groupByProperty) throws Exception;
/**
+ * Counts the number of delivering messages in this queue matching the specified filter.
+ * <br>
+ * Using {@code null} or an empty filter will count <em>all</em> messages from this queue.
+ */
+ @Operation(desc = "Returns the number of the messages in the queue matching the given filter")
+ long countDeliveringMessages(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter) throws Exception;
+
+ /**
+ * Counts the number of delivering messages in this queue matching the specified filter, grouped by the given property field.
+ * In case of null property will be grouped in "null"
+ * <br>
+ * Using {@code null} or an empty filter will count <em>all</em> messages from this queue.
+ */
+ @Operation(desc = "Returns the number of the messages in the queue matching the given filter, grouped by the given property field")
+ String countDeliveringMessages(@Parameter(name = "filter", desc = "This filter separate account messages") String filter, @Parameter(name = "groupByProperty", desc = "This property to group by") String groupByProperty) throws Exception;
+
+ /**
* Removes the message corresponding to the specified message ID.
*
* @return {@code true} if the message was removed, {@code false} else
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dba405c4/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 3db5cae..d62978f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -706,9 +706,10 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
private Map<String, Long> intenalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
checkStarted();
+
clearIO();
- Map<String, Long> result = new HashMap<>();
+ Map<String, Long> result = new HashMap<>();
try {
Filter filter = FilterImpl.createFilter(filterStr);
SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
@@ -719,15 +720,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
try {
while (iterator.hasNext()) {
Message message = iterator.next().getMessage();
- if (filter == null || filter.match(message)) {
- if (groupByProperty == null) {
- result.compute(null, (k,v) -> v == null ? 1 : ++v);
- } else {
- Object value = message.getObjectProperty(groupByProperty);
- String valueStr = value == null ? null : value.toString();
- result.compute(valueStr, (k,v) -> v == null ? 1 : ++v);
- }
- }
+ internalComputeMessage(result, filter, groupByProperty, message);
}
} catch (NoSuchElementException ignored) {
// this could happen through paging browsing
@@ -741,6 +734,54 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
@Override
+ public long countDeliveringMessages(final String filterStr) throws Exception {
+ Long value = intenalCountDeliveryMessages(filterStr, null).get(null);
+ return value == null ? 0 : value;
+ }
+
+ @Override
+ public String countDeliveringMessages(final String filterStr, final String groupByProperty) throws Exception {
+ return JsonUtil.toJsonObject(intenalCountDeliveryMessages(filterStr, groupByProperty)).toString();
+ }
+
+ private Map<String, Long> intenalCountDeliveryMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
+ checkStarted();
+
+ clearIO();
+
+ Map<String, Long> result = new HashMap<>();
+ try {
+ Filter filter = FilterImpl.createFilter(filterStr);
+ SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr);
+ if (filter == null && groupByProperty == null) {
+ result.put(null, Long.valueOf(getDeliveringCount()));
+ } else {
+ Map<String, List<MessageReference>> deliveringMessages = queue.getDeliveringMessages();
+ deliveringMessages.forEach((s, messageReferenceList) ->
+ messageReferenceList.forEach(messageReference ->
+ internalComputeMessage(result, filter, groupByProperty, messageReference.getMessage())
+ ));
+ }
+ return result;
+ } finally {
+ blockOnIO();
+ }
+ }
+
+ private void internalComputeMessage(Map<String, Long> result, Filter filter, SimpleString groupByProperty, Message message) {
+ if (filter == null || filter.match(message)) {
+ if (groupByProperty == null) {
+ result.compute(null, (k, v) -> v == null ? 1 : ++v);
+ } else {
+ Object value = message.getObjectProperty(groupByProperty);
+ String valueStr = value == null ? null : value.toString();
+ result.compute(valueStr, (k, v) -> v == null ? 1 : ++v);
+ }
+ }
+ }
+
+
+ @Override
public boolean removeMessage(final long messageID) throws Exception {
checkStarted();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dba405c4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 5ba0b39..d64eba6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -1704,6 +1704,189 @@ public class QueueControlTest extends ManagementTestBase {
}
@Test
+ public void testCountDeliveringMessageCountWithFilter() throws Exception {
+ SimpleString key = new SimpleString("key");
+ long matchingValue = RandomUtil.randomPositiveLong();
+ long unmatchingValue = matchingValue + 1;
+
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, queue, null, false);
+ ClientProducer producer = session.createProducer(address);
+
+ ClientMessage matchingMessage = session.createMessage(false);
+ matchingMessage.putLongProperty(key, matchingValue);
+ ClientMessage unmatchingMessage = session.createMessage(false);
+ unmatchingMessage.putLongProperty(key, unmatchingValue);
+ producer.send(matchingMessage);
+ producer.send(unmatchingMessage);
+ producer.send(matchingMessage);
+
+ QueueControl queueControl = createManagementControl(address, queue);
+ Assert.assertEquals(0, queueControl.countDeliveringMessages(null));
+ Assert.assertEquals(0, queueControl.countDeliveringMessages(key + " =" + matchingValue));
+ Assert.assertEquals(0, queueControl.countDeliveringMessages(key + " =" + unmatchingValue));
+
+ ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
+ ClientMessage message = consumer.receive(500);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(3, queueControl.countDeliveringMessages(null));
+ Assert.assertEquals(2, queueControl.countDeliveringMessages(key + " =" + matchingValue));
+ Assert.assertEquals(1, queueControl.countDeliveringMessages(key + " =" + unmatchingValue));
+
+ consumer.close();
+ session.deleteQueue(queue);
+ }
+
+ @Test
+ public void testCountDeliveringMessageCountNoFilter() throws Exception {
+ SimpleString key = new SimpleString("key");
+ long matchingValue = RandomUtil.randomLong();
+ long unmatchingValue = matchingValue + 1;
+
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, queue, null, false);
+ ClientProducer producer = session.createProducer(address);
+
+ ClientMessage matchingMessage = session.createMessage(false);
+ matchingMessage.putLongProperty(key, matchingValue);
+ ClientMessage unmatchingMessage = session.createMessage(false);
+ unmatchingMessage.putLongProperty(key, unmatchingValue);
+ producer.send(matchingMessage);
+ producer.send(unmatchingMessage);
+ producer.send(matchingMessage);
+
+ QueueControl queueControl = createManagementControl(address, queue);
+ Assert.assertEquals(0, queueControl.countDeliveringMessages(null));
+
+ ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
+ ClientMessage message = consumer.receive(500);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(3, queueControl.countDeliveringMessages(null));
+
+ consumer.close();
+ session.deleteQueue(queue);
+ }
+
+ @Test
+ public void testCountDeliveringMessageCountNoGroupNoFilter() throws Exception {
+ SimpleString key = new SimpleString("key");
+ long matchingValue = RandomUtil.randomLong();
+ long unmatchingValue = matchingValue + 1;
+
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, queue, null, false);
+ ClientProducer producer = session.createProducer(address);
+
+ ClientMessage matchingMessage = session.createMessage(false);
+ matchingMessage.putLongProperty(key, matchingValue);
+ ClientMessage unmatchingMessage = session.createMessage(false);
+ unmatchingMessage.putLongProperty(key, unmatchingValue);
+ producer.send(matchingMessage);
+ producer.send(unmatchingMessage);
+ producer.send(matchingMessage);
+
+ QueueControl queueControl = createManagementControl(address, queue);
+ String result = queueControl.countDeliveringMessages(null, null);
+ JsonObject jsonObject = JsonUtil.readJsonObject(result);
+ Assert.assertEquals(0, jsonObject.getInt("null"));
+
+ ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
+ ClientMessage message = consumer.receive(500);
+ Assert.assertNotNull(message);
+
+ result = queueControl.countDeliveringMessages(null, null);
+ jsonObject = JsonUtil.readJsonObject(result);
+ Assert.assertEquals(3, jsonObject.getInt("null"));
+
+ consumer.close();
+ session.deleteQueue(queue);
+ }
+
+ @Test
+ public void testCountDeliveringMessageCountGroupNoFilter() throws Exception {
+ String key = new String("key_group");
+ String valueGroup1 = "group_1";
+ String valueGroup2 = "group_2";
+
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, queue, null, false);
+ ClientProducer producer = session.createProducer(address);
+
+ ClientMessage message1 = session.createMessage(false);
+ message1.putStringProperty(key, valueGroup1);
+ ClientMessage message2 = session.createMessage(false);
+ message2.putStringProperty(key, valueGroup2);
+ producer.send(message1);
+ producer.send(message2);
+ producer.send(message1);
+
+ QueueControl queueControl = createManagementControl(address, queue);
+ String result = queueControl.countDeliveringMessages(null, key);
+ JsonObject jsonObject = JsonUtil.readJsonObject(result);
+ Assert.assertTrue(jsonObject.isEmpty());
+
+ ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
+ ClientMessage message = consumer.receive(500);
+ Assert.assertNotNull(message);
+
+ result = queueControl.countDeliveringMessages(null, key);
+ jsonObject = JsonUtil.readJsonObject(result);
+ Assert.assertEquals(2, jsonObject.getInt(valueGroup1));
+ Assert.assertEquals(1, jsonObject.getInt(valueGroup2));
+ Assert.assertFalse(jsonObject.containsKey(null));
+
+ consumer.close();
+ session.deleteQueue(queue);
+ }
+
+ @Test
+ public void testCountDeliveringMessageCountGroupFilter() throws Exception {
+ String key = new String("key_group");
+ long valueGroup1 = RandomUtil.randomLong();
+ long valueGroup2 = valueGroup1 + 1;
+
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, queue, null, false);
+ ClientProducer producer = session.createProducer(address);
+
+ ClientMessage message1 = session.createMessage(false);
+ message1.putLongProperty(key, valueGroup1);
+ ClientMessage message2 = session.createMessage(false);
+ message2.putLongProperty(key, valueGroup2);
+ producer.send(message1);
+ producer.send(message2);
+ producer.send(message1);
+
+ QueueControl queueControl = createManagementControl(address, queue);
+ String result = queueControl.countDeliveringMessages(key + " =" + valueGroup1, key);
+ JsonObject jsonObject = JsonUtil.readJsonObject(result);
+ Assert.assertTrue(jsonObject.isEmpty());
+
+ ClientConsumer consumer = session.createConsumer(queue, null, 1024 * 1024, 1, false);
+ ClientMessage message = consumer.receive(500);
+ Assert.assertNotNull(message);
+
+ result = queueControl.countDeliveringMessages(key + " =" + valueGroup1, key);
+ jsonObject = JsonUtil.readJsonObject(result);
+ Assert.assertEquals(2, jsonObject.getInt(String.valueOf(valueGroup1)));
+ Assert.assertFalse(jsonObject.containsKey(String.valueOf(valueGroup2)));
+ Assert.assertFalse(jsonObject.containsKey(null));
+
+ consumer.close();
+ session.deleteQueue(queue);
+ }
+
+ @Test
public void testCountMessagesWithFilter() throws Exception {
SimpleString key = new SimpleString("key");
long matchingValue = RandomUtil.randomLong();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dba405c4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index 26ada03..1f2032a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -107,6 +107,16 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
}
@Override
+ public long countDeliveringMessages(final String filter) throws Exception {
+ return (Long) proxy.invokeOperation(Long.class, "countDeliveringMessages", filter);
+ }
+
+ @Override
+ public String countDeliveringMessages(final String filter, final String groupByFilter) throws Exception {
+ return (String) proxy.invokeOperation(String.class, "countDeliveringProperty", filter, groupByFilter);
+ }
+
+ @Override
public boolean expireMessage(final long messageID) throws Exception {
return (Boolean) proxy.invokeOperation("expireMessage", messageID);
}