You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2018/03/29 13:02:02 UTC

activemq git commit: AMQ-6940 - Reduce memory footprint for inflight statistics

Repository: activemq
Updated Branches:
  refs/heads/master 8fd82ff69 -> f69fd6f00


AMQ-6940 - Reduce memory footprint for inflight statistics

For the TopicSubscription case we can reduce the inflight statistics
memory footprint by not storing the entire message reference for in
flight messages and instead just a subset of the information needed.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f69fd6f0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f69fd6f0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f69fd6f0

Branch: refs/heads/master
Commit: f69fd6f0020290752a7424479821c22d94f9b8b7
Parents: 8fd82ff
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Thu Mar 29 08:54:17 2018 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Mar 29 09:00:26 2018 -0400

----------------------------------------------------------------------
 .../broker/region/TopicSubscription.java        | 57 +++++++++++++++-----
 1 file changed, 44 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f69fd6f0/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 4b958f3..4962de6 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -32,6 +32,7 @@ import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
 import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
@@ -74,7 +75,7 @@ public class TopicSubscription extends AbstractSubscription {
 
     //Used for inflight message size calculations
     protected final Object dispatchLock = new Object();
-    protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
+    protected final List<DispatchedNode> dispatched = new ArrayList<>();
 
     public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
         super(broker, context, info);
@@ -257,7 +258,7 @@ public class TopicSubscription extends AbstractSubscription {
                         synchronized(dispatchLock) {
                             matched.remove();
                             getSubscriptionStatistics().getDispatched().increment();
-                            dispatched.add(node);
+                            dispatched.add(new DispatchedNode(node));
                             getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
                             node.decrementReferenceCount();
                         }
@@ -383,8 +384,8 @@ public class TopicSubscription extends AbstractSubscription {
     private void updateStatsOnAck(final MessageAck ack) {
         synchronized(dispatchLock) {
             boolean inAckRange = false;
-            List<MessageReference> removeList = new ArrayList<MessageReference>();
-            for (final MessageReference node : dispatched) {
+            List<DispatchedNode> removeList = new ArrayList<>();
+            for (final DispatchedNode node : dispatched) {
                 MessageId messageId = node.getMessageId();
                 if (ack.getFirstMessageId() == null
                         || ack.getFirstMessageId().equals(messageId)) {
@@ -398,17 +399,21 @@ public class TopicSubscription extends AbstractSubscription {
                 }
             }
 
-            for (final MessageReference node : removeList) {
+            for (final DispatchedNode node : removeList) {
                 dispatched.remove(node);
                 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
                 getSubscriptionStatistics().getDequeues().increment();
-                ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
-                ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
-                if (info.isNetworkSubscription()) {
-                    ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
-                }
-                if (ack.isExpiredAck()) {
-                    destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
+
+                final Destination destination = node.getDestination();
+                if (destination != null) {
+                    destination.getDestinationStatistics().getDequeues().increment();
+                    destination.getDestinationStatistics().getInflight().decrement();
+                    if (info.isNetworkSubscription()) {
+                        destination.getDestinationStatistics().getForwards().increment();
+                    }
+                    if (ack.isExpiredAck()) {
+                        destination.getDestinationStatistics().getExpired().increment();
+                    }
                 }
                 if (!ack.isInTransaction()) {
                     contractPrefetchExtension(1);
@@ -648,7 +653,7 @@ public class TopicSubscription extends AbstractSubscription {
             md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
             synchronized(dispatchLock) {
                 getSubscriptionStatistics().getDispatched().increment();
-                dispatched.add(node);
+                dispatched.add(new DispatchedNode(node));
                 getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
             }
 
@@ -759,4 +764,30 @@ public class TopicSubscription extends AbstractSubscription {
         }
     }
 
+    private static class DispatchedNode {
+        private final int size;
+        private final MessageId messageId;
+        private final Destination destination;
+
+        public DispatchedNode(final MessageReference node) {
+            super();
+            this.size = node.getSize();
+            this.messageId = node.getMessageId();
+            this.destination = node.getRegionDestination() instanceof Destination ?
+                    ((Destination)node.getRegionDestination()) : null;
+        }
+
+        public long getSize() {
+            return size;
+        }
+
+        public MessageId getMessageId() {
+            return messageId;
+        }
+
+        public Destination getDestination() {
+            return destination;
+        }
+    }
+
 }