You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/25 22:07:57 UTC
[1/2] activemq-artemis git commit: This closes #2172
Repository: activemq-artemis
Updated Branches:
refs/heads/master a9916adcb -> 2453978f4
This closes #2172
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2453978f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2453978f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2453978f
Branch: refs/heads/master
Commit: 2453978f415f64d18ad44ea6e983ae128daa7098
Parents: a9916ad 9c62531
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Sep 25 18:07:32 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 25 18:07:32 2018 -0400
----------------------------------------------------------------------
.../api/core/management/AddressControl.java | 12 ++++++++
.../management/impl/AddressControlImpl.java | 10 +++++++
.../core/postoffice/impl/PostOfficeImpl.java | 29 ++++++++++---------
.../artemis/core/server/impl/AddressInfo.java | 25 ++++++++++++++++
.../management/AddressControlTest.java | 30 ++++++++++++++++++++
.../management/AddressControlUsingCoreTest.java | 10 +++++++
6 files changed, 102 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
[2/2] activemq-artemis git commit: ARTEMIS-1961 track routed and
unrouted messages sent to an address
Posted by cl...@apache.org.
ARTEMIS-1961 track routed and unrouted messages sent to an address
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9c62531c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9c62531c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9c62531c
Branch: refs/heads/master
Commit: 9c62531c2fe000a3c07feba073332a6829463fb3
Parents: a9916ad
Author: Justin Bertram <jb...@apache.org>
Authored: Fri Jun 29 09:19:15 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 25 18:07:32 2018 -0400
----------------------------------------------------------------------
.../api/core/management/AddressControl.java | 12 ++++++++
.../management/impl/AddressControlImpl.java | 10 +++++++
.../core/postoffice/impl/PostOfficeImpl.java | 29 ++++++++++---------
.../artemis/core/server/impl/AddressInfo.java | 25 ++++++++++++++++
.../management/AddressControlTest.java | 30 ++++++++++++++++++++
.../management/AddressControlUsingCoreTest.java | 10 +++++++
6 files changed, 102 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c62531c/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
index 93a2822..4bb100b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
@@ -104,6 +104,18 @@ public interface AddressControl {
@Attribute(desc = "number of messages added to all the queues for this address")
long getMessageCount();
+ /**
+ * Returns the number of messages routed to one or more bindings
+ */
+ @Attribute(desc = "number of messages routed to one or more bindings")
+ long getRoutedMessageCount();
+
+ /**
+ * Returns the number of messages not routed to any bindings
+ */
+ @Attribute(desc = "number of messages not routed to any bindings")
+ long getUnRoutedMessageCount();
+
/**
* @param headers the message headers and properties to set. Can only
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c62531c/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index 82c6a6c..0eb39e0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -267,6 +267,16 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
return getMessageCount(DurabilityType.ALL);
}
+ @Override
+ public long getRoutedMessageCount() {
+ return addressInfo.getRoutedMessageCount();
+ }
+
+ @Override
+ public long getUnRoutedMessageCount() {
+ return addressInfo.getUnRoutedMessageCount();
+ }
+
@Override
public String sendMessage(final Map<String, String> headers,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c62531c/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 02abf46..ec451f7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -842,11 +842,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
throw new IllegalStateException("Message cannot be routed more than once");
}
- setPagingStore(context.getAddress(message), message);
+ final SimpleString address = context.getAddress(message);
- AtomicBoolean startedTX = new AtomicBoolean(false);
+ setPagingStore(address, message);
- final SimpleString address = context.getAddress(message);
+ AtomicBoolean startedTX = new AtomicBoolean(false);
applyExpiryDelay(message, address);
@@ -856,23 +856,24 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
message.cleanupInternalProperties();
- Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress(message));
+ Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
+
+ AddressInfo addressInfo = addressManager.getAddressInfo(address);
- // TODO auto-create queues here?
- // first check for the auto-queue creation thing
- if (bindings == null) {
- // There is no queue with this address, we will check if it needs to be created
- // if (queueCreator.create(address)) {
- // TODO: this is not working!!!!
- // reassign bindings if it was created
- // bindings = addressManager.getBindingsForRoutingAddress(address);
- // }
- }
if (bindingMove != null) {
bindingMove.route(message, context);
+ if (addressInfo != null) {
+ addressInfo.incrementRoutedMessageCount();
+ }
} else if (bindings != null) {
bindings.route(message, context);
+ if (addressInfo != null) {
+ addressInfo.incrementRoutedMessageCount();
+ }
} else {
+ if (addressInfo != null) {
+ addressInfo.incrementUnRoutedMessageCount();
+ }
// this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS)
if (logger.isDebugEnabled()) {
logger.debug("Couldn't find any bindings for address=" + address + " on message=" + message);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c62531c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index 4bc540f..0cf9452 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.utils.PrefixUtil;
import java.util.EnumSet;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
public class AddressInfo {
@@ -36,6 +37,14 @@ public class AddressInfo {
private boolean internal = false;
+ private volatile long routedMessageCount = 0;
+
+ private static final AtomicLongFieldUpdater<AddressInfo> routedMessageCountUpdater = AtomicLongFieldUpdater.newUpdater(AddressInfo.class, "routedMessageCount");
+
+ private volatile long unRoutedMessageCount = 0;
+
+ private static final AtomicLongFieldUpdater<AddressInfo> unRoutedMessageCountUpdater = AtomicLongFieldUpdater.newUpdater(AddressInfo.class, "unRoutedMessageCount");
+
public AddressInfo(SimpleString name) {
this(name, EnumSet.noneOf(RoutingType.class));
}
@@ -155,4 +164,20 @@ public class AddressInfo {
return this;
}
+ public long incrementRoutedMessageCount() {
+ return routedMessageCountUpdater.incrementAndGet(this);
+ }
+
+ public long incrementUnRoutedMessageCount() {
+ return unRoutedMessageCountUpdater.incrementAndGet(this);
+ }
+
+ public long getRoutedMessageCount() {
+ return routedMessageCountUpdater.get(this);
+ }
+
+ public long getUnRoutedMessageCount() {
+ return unRoutedMessageCountUpdater.get(this);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c62531c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
index 9f66bcf..69794d8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
@@ -341,6 +341,36 @@ public class AddressControlTest extends ManagementTestBase {
}
@Test
+ public void testGetRoutedMessageCounts() throws Exception {
+ SimpleString address = RandomUtil.randomSimpleString();
+ session.createAddress(address, RoutingType.ANYCAST, false);
+
+ AddressControl addressControl = createManagementControl(address);
+ assertEquals(0, addressControl.getMessageCount());
+
+ ClientProducer producer = session.createProducer(address.toString());
+ producer.send(session.createMessage(false));
+ assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 0, 2000, 100));
+ assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 1, 2000, 100));
+
+ session.createQueue(address, RoutingType.ANYCAST, address);
+ producer.send(session.createMessage(false));
+ assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 1, 2000, 100));
+ assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 1, 2000, 100));
+
+ session.createQueue(address, RoutingType.ANYCAST, address.concat('2'));
+ producer.send(session.createMessage(false));
+ assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 2, 2000, 100));
+ assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 1, 2000, 100));
+
+ session.deleteQueue(address);
+ session.deleteQueue(address.concat('2'));
+ producer.send(session.createMessage(false));
+ assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 2, 2000, 100));
+ assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 2, 2000, 100));
+ }
+
+ @Test
public void testSendMessage() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
session.createAddress(address, RoutingType.ANYCAST, false);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c62531c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
index d9c9f2e..5e5dc54 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
@@ -104,6 +104,16 @@ public class AddressControlUsingCoreTest extends AddressControlTest {
}
@Override
+ public long getRoutedMessageCount() {
+ return (long) proxy.retrieveAttributeValue("routedMessageCount");
+ }
+
+ @Override
+ public long getUnRoutedMessageCount() {
+ return (long) proxy.retrieveAttributeValue("unRoutedMessageCount");
+ }
+
+ @Override
public String sendMessage(Map<String, String> headers,
int type,
String body,