You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/25 22:07:58 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1961 track routed and unrouted messages sent to an address

ARTEMIS-1961 track routed and unrouted messages sent to an address


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9c62531c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9c62531c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9c62531c

Branch: refs/heads/master
Commit: 9c62531c2fe000a3c07feba073332a6829463fb3
Parents: a9916ad
Author: Justin Bertram <jb...@apache.org>
Authored: Fri Jun 29 09:19:15 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 25 18:07:32 2018 -0400

----------------------------------------------------------------------
 .../api/core/management/AddressControl.java     | 12 ++++++++
 .../management/impl/AddressControlImpl.java     | 10 +++++++
 .../core/postoffice/impl/PostOfficeImpl.java    | 29 ++++++++++---------
 .../artemis/core/server/impl/AddressInfo.java   | 25 ++++++++++++++++
 .../management/AddressControlTest.java          | 30 ++++++++++++++++++++
 .../management/AddressControlUsingCoreTest.java | 10 +++++++
 6 files changed, 102 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c62531c/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
index 93a2822..4bb100b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
@@ -104,6 +104,18 @@ public interface AddressControl {
    @Attribute(desc = "number of messages added to all the queues for this address")
    long getMessageCount();
 
+   /**
+    * Returns the number of messages routed to one or more bindings
+    */
+   @Attribute(desc = "number of messages routed to one or more bindings")
+   long getRoutedMessageCount();
+
+   /**
+    * Returns the number of messages not routed to any bindings
+    */
+   @Attribute(desc = "number of messages not routed to any bindings")
+   long getUnRoutedMessageCount();
+
 
    /**
     * @param headers  the message headers and properties to set. Can only

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c62531c/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index 82c6a6c..0eb39e0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -267,6 +267,16 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
       return getMessageCount(DurabilityType.ALL);
    }
 
+   @Override
+   public long getRoutedMessageCount() {
+      return addressInfo.getRoutedMessageCount();
+   }
+
+   @Override
+   public long getUnRoutedMessageCount() {
+      return addressInfo.getUnRoutedMessageCount();
+   }
+
 
    @Override
    public String sendMessage(final Map<String, String> headers,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c62531c/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 02abf46..ec451f7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -842,11 +842,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          throw new IllegalStateException("Message cannot be routed more than once");
       }
 
-      setPagingStore(context.getAddress(message), message);
+      final SimpleString address = context.getAddress(message);
 
-      AtomicBoolean startedTX = new AtomicBoolean(false);
+      setPagingStore(address, message);
 
-      final SimpleString address = context.getAddress(message);
+      AtomicBoolean startedTX = new AtomicBoolean(false);
 
       applyExpiryDelay(message, address);
 
@@ -856,23 +856,24 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
       message.cleanupInternalProperties();
 
-      Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress(message));
+      Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
+
+      AddressInfo addressInfo = addressManager.getAddressInfo(address);
 
-      // TODO auto-create queues here?
-      // first check for the auto-queue creation thing
-      if (bindings == null) {
-         // There is no queue with this address, we will check if it needs to be created
-         //         if (queueCreator.create(address)) {
-         // TODO: this is not working!!!!
-         // reassign bindings if it was created
-         //            bindings = addressManager.getBindingsForRoutingAddress(address);
-         //         }
-      }
       if (bindingMove != null) {
          bindingMove.route(message, context);
+         if (addressInfo != null) {
+            addressInfo.incrementRoutedMessageCount();
+         }
       } else if (bindings != null) {
          bindings.route(message, context);
+         if (addressInfo != null) {
+            addressInfo.incrementRoutedMessageCount();
+         }
       } else {
+         if (addressInfo != null) {
+            addressInfo.incrementUnRoutedMessageCount();
+         }
          // this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS)
          if (logger.isDebugEnabled()) {
             logger.debug("Couldn't find any bindings for address=" + address + " on message=" + message);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c62531c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index 4bc540f..0cf9452 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.utils.PrefixUtil;
 
 import java.util.EnumSet;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 public class AddressInfo {
 
@@ -36,6 +37,14 @@ public class AddressInfo {
 
    private boolean internal = false;
 
+   private volatile long routedMessageCount = 0;
+
+   private static final AtomicLongFieldUpdater<AddressInfo> routedMessageCountUpdater = AtomicLongFieldUpdater.newUpdater(AddressInfo.class, "routedMessageCount");
+
+   private volatile long unRoutedMessageCount = 0;
+
+   private static final AtomicLongFieldUpdater<AddressInfo> unRoutedMessageCountUpdater = AtomicLongFieldUpdater.newUpdater(AddressInfo.class, "unRoutedMessageCount");
+
    public AddressInfo(SimpleString name) {
       this(name, EnumSet.noneOf(RoutingType.class));
    }
@@ -155,4 +164,20 @@ public class AddressInfo {
       return this;
    }
 
+   public long incrementRoutedMessageCount() {
+      return routedMessageCountUpdater.incrementAndGet(this);
+   }
+
+   public long incrementUnRoutedMessageCount() {
+      return unRoutedMessageCountUpdater.incrementAndGet(this);
+   }
+
+   public long getRoutedMessageCount() {
+      return routedMessageCountUpdater.get(this);
+   }
+
+   public long getUnRoutedMessageCount() {
+      return unRoutedMessageCountUpdater.get(this);
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c62531c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
index 9f66bcf..69794d8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
@@ -341,6 +341,36 @@ public class AddressControlTest extends ManagementTestBase {
    }
 
    @Test
+   public void testGetRoutedMessageCounts() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      session.createAddress(address, RoutingType.ANYCAST, false);
+
+      AddressControl addressControl = createManagementControl(address);
+      assertEquals(0, addressControl.getMessageCount());
+
+      ClientProducer producer = session.createProducer(address.toString());
+      producer.send(session.createMessage(false));
+      assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 0, 2000, 100));
+      assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 1, 2000, 100));
+
+      session.createQueue(address, RoutingType.ANYCAST, address);
+      producer.send(session.createMessage(false));
+      assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 1, 2000, 100));
+      assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 1, 2000, 100));
+
+      session.createQueue(address, RoutingType.ANYCAST, address.concat('2'));
+      producer.send(session.createMessage(false));
+      assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 2, 2000, 100));
+      assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 1, 2000, 100));
+
+      session.deleteQueue(address);
+      session.deleteQueue(address.concat('2'));
+      producer.send(session.createMessage(false));
+      assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 2, 2000, 100));
+      assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 2, 2000, 100));
+   }
+
+   @Test
    public void testSendMessage() throws Exception {
       SimpleString address = RandomUtil.randomSimpleString();
       session.createAddress(address, RoutingType.ANYCAST, false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9c62531c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
index d9c9f2e..5e5dc54 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
@@ -104,6 +104,16 @@ public class AddressControlUsingCoreTest extends AddressControlTest {
          }
 
          @Override
+         public long getRoutedMessageCount() {
+            return (long) proxy.retrieveAttributeValue("routedMessageCount");
+         }
+
+         @Override
+         public long getUnRoutedMessageCount() {
+            return (long) proxy.retrieveAttributeValue("unRoutedMessageCount");
+         }
+
+         @Override
          public String sendMessage(Map<String, String> headers,
                                    int type,
                                    String body,