You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2018/03/29 17:25:48 UTC

activemq git commit: AMQ-6940 - Add flag to disable TopicSubscription in flight stats

Repository: activemq
Updated Branches:
  refs/heads/master f69fd6f00 -> 65b0f2ad0


AMQ-6940 - Add flag to disable TopicSubscription in flight stats

To save memory usage in some use cases add a new flag to PolicyEntry
called useTopicSubscriptionInflightStats to allow disabling the
inflight stats


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/65b0f2ad
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/65b0f2ad
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/65b0f2ad

Branch: refs/heads/master
Commit: 65b0f2ad0d48845ad54681ac0eff832de122e2a9
Parents: f69fd6f
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Thu Mar 29 13:23:33 2018 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Mar 29 13:25:36 2018 -0400

----------------------------------------------------------------------
 .../broker/region/TopicSubscription.java        | 96 ++++++++++++--------
 .../broker/region/policy/PolicyEntry.java       | 10 ++
 .../AbstractInflightMessageSizeTest.java        | 43 +++++++--
 ...ableSubscriptionInflightMessageSizeTest.java |  5 +-
 ...ueueSubscriptionInflightMessageSizeTest.java |  5 +-
 ...opicSubscriptionInflightMessageSizeTest.java | 34 ++++++-
 6 files changed, 142 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/65b0f2ad/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 4962de6..bf3f97b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -32,7 +32,6 @@ import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
 import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
-import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
@@ -72,6 +71,7 @@ public class TopicSubscription extends AbstractSubscription {
     protected ActiveMQMessageAudit audit;
     protected boolean active = false;
     protected boolean discarding = false;
+    private boolean useTopicSubscriptionInflightStats = true;
 
     //Used for inflight message size calculations
     protected final Object dispatchLock = new Object();
@@ -258,8 +258,10 @@ public class TopicSubscription extends AbstractSubscription {
                         synchronized(dispatchLock) {
                             matched.remove();
                             getSubscriptionStatistics().getDispatched().increment();
-                            dispatched.add(new DispatchedNode(node));
-                            getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
+                            if (isUseTopicSubscriptionInflightStats()) {
+                                dispatched.add(new DispatchedNode(node));
+                                getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
+                            }
                             node.decrementReferenceCount();
                         }
                         break;
@@ -382,43 +384,55 @@ public class TopicSubscription extends AbstractSubscription {
      * @param ack
      */
     private void updateStatsOnAck(final MessageAck ack) {
-        synchronized(dispatchLock) {
-            boolean inAckRange = false;
-            List<DispatchedNode> removeList = new ArrayList<>();
-            for (final DispatchedNode node : dispatched) {
-                MessageId messageId = node.getMessageId();
-                if (ack.getFirstMessageId() == null
-                        || ack.getFirstMessageId().equals(messageId)) {
-                    inAckRange = true;
-                }
-                if (inAckRange) {
-                    removeList.add(node);
-                    if (ack.getLastMessageId().equals(messageId)) {
-                        break;
+        //Allow disabling inflight stats to save memory usage
+        if (isUseTopicSubscriptionInflightStats()) {
+            synchronized(dispatchLock) {
+                boolean inAckRange = false;
+                List<DispatchedNode> removeList = new ArrayList<>();
+                for (final DispatchedNode node : dispatched) {
+                    MessageId messageId = node.getMessageId();
+                    if (ack.getFirstMessageId() == null
+                            || ack.getFirstMessageId().equals(messageId)) {
+                        inAckRange = true;
+                    }
+                    if (inAckRange) {
+                        removeList.add(node);
+                        if (ack.getLastMessageId().equals(messageId)) {
+                            break;
+                        }
                     }
                 }
-            }
 
-            for (final DispatchedNode node : removeList) {
-                dispatched.remove(node);
-                getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
-                getSubscriptionStatistics().getDequeues().increment();
-
-                final Destination destination = node.getDestination();
-                if (destination != null) {
-                    destination.getDestinationStatistics().getDequeues().increment();
-                    destination.getDestinationStatistics().getInflight().decrement();
-                    if (info.isNetworkSubscription()) {
-                        destination.getDestinationStatistics().getForwards().increment();
-                    }
-                    if (ack.isExpiredAck()) {
-                        destination.getDestinationStatistics().getExpired().increment();
+                for (final DispatchedNode node : removeList) {
+                    dispatched.remove(node);
+                    getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
+
+                    final Destination destination = node.getDestination();
+                    incrementStatsOnAck(destination, ack, 1);
+                    if (!ack.isInTransaction()) {
+                        contractPrefetchExtension(1);
                     }
                 }
-                if (!ack.isInTransaction()) {
-                    contractPrefetchExtension(1);
-                }
             }
+        } else {
+            if (singleDestination && destination != null) {
+                incrementStatsOnAck(destination, ack, ack.getMessageCount());
+            }
+            if (!ack.isInTransaction()) {
+                contractPrefetchExtension(ack.getMessageCount());
+            }
+        }
+    }
+
+    private void incrementStatsOnAck(final Destination destination, final MessageAck ack, final int count) {
+        getSubscriptionStatistics().getDequeues().add(count);
+        destination.getDestinationStatistics().getDequeues().add(count);
+        destination.getDestinationStatistics().getInflight().subtract(count);
+        if (info.isNetworkSubscription()) {
+            destination.getDestinationStatistics().getForwards().add(count);
+        }
+        if (ack.isExpiredAck()) {
+            destination.getDestinationStatistics().getExpired().add(count);
         }
     }
 
@@ -653,8 +667,10 @@ public class TopicSubscription extends AbstractSubscription {
             md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
             synchronized(dispatchLock) {
                 getSubscriptionStatistics().getDispatched().increment();
-                dispatched.add(new DispatchedNode(node));
-                getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
+                if (isUseTopicSubscriptionInflightStats()) {
+                    dispatched.add(new DispatchedNode(node));
+                    getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
+                }
             }
 
             // Keep track if this subscription is receiving messages from a single destination.
@@ -764,6 +780,14 @@ public class TopicSubscription extends AbstractSubscription {
         }
     }
 
+    public boolean isUseTopicSubscriptionInflightStats() {
+        return useTopicSubscriptionInflightStats;
+    }
+
+    public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) {
+        this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
+    }
+
     private static class DispatchedNode {
         private final int size;
         private final MessageId messageId;

http://git-wip-us.apache.org/repos/asf/activemq/blob/65b0f2ad/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index c5e5980..164b984 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -103,6 +103,7 @@ public class PolicyEntry extends DestinationMapEntry {
     private NetworkBridgeFilterFactory networkBridgeFilterFactory;
     private boolean doOptimzeMessageStorage = true;
     private int maxDestinations = -1;
+    private boolean useTopicSubscriptionInflightStats = true;
 
     /*
      * percentage of in-flight messages above which optimize message store is disabled
@@ -323,6 +324,7 @@ public class PolicyEntry extends DestinationMapEntry {
         configurePrefetch(subscription);
         subscription.setUsePrefetchExtension(isUsePrefetchExtension());
         subscription.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
+        subscription.setUseTopicSubscriptionInflightStats(isUseTopicSubscriptionInflightStats());
         if (pendingMessageLimitStrategy != null) {
             int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
             int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit();
@@ -1128,4 +1130,12 @@ public class PolicyEntry extends DestinationMapEntry {
     public long getSendFailIfNoSpaceAfterTimeout() {
         return this.sendFailIfNoSpaceAfterTimeout;
     }
+
+    public boolean isUseTopicSubscriptionInflightStats() {
+        return useTopicSubscriptionInflightStats;
+    }
+
+    public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) {
+        this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/65b0f2ad/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
index 07784e7..a127feb 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
@@ -38,6 +38,8 @@ import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.util.Wait;
 import org.junit.After;
@@ -59,6 +61,7 @@ public abstract class AbstractInflightMessageSizeTest {
     protected Destination amqDestination;
     protected MessageConsumer consumer;
     protected int prefetch = 100;
+    protected boolean useTopicSubscriptionInflightStats;
     final protected int ackType;
     final protected boolean optimizeAcknowledge;
     final protected String destName = "testDest";
@@ -66,20 +69,29 @@ public abstract class AbstractInflightMessageSizeTest {
     @Parameters
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
-                {ActiveMQSession.SESSION_TRANSACTED, true},
-                {ActiveMQSession.AUTO_ACKNOWLEDGE, true},
-                {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, true},
-                {ActiveMQSession.CLIENT_ACKNOWLEDGE, true},
-                {ActiveMQSession.SESSION_TRANSACTED, false},
-                {ActiveMQSession.AUTO_ACKNOWLEDGE, false},
-                {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, false},
-                {ActiveMQSession.CLIENT_ACKNOWLEDGE, false}
+                {ActiveMQSession.SESSION_TRANSACTED, true, true},
+                {ActiveMQSession.AUTO_ACKNOWLEDGE, true, true},
+                {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, true, true},
+                {ActiveMQSession.CLIENT_ACKNOWLEDGE, true, true},
+                {ActiveMQSession.SESSION_TRANSACTED, false, true},
+                {ActiveMQSession.AUTO_ACKNOWLEDGE, false, true},
+                {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, false, true},
+                {ActiveMQSession.CLIENT_ACKNOWLEDGE, false, true},
+                {ActiveMQSession.SESSION_TRANSACTED, true, false},
+                {ActiveMQSession.AUTO_ACKNOWLEDGE, true, false},
+                {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, true, false},
+                {ActiveMQSession.CLIENT_ACKNOWLEDGE, true, false},
+                {ActiveMQSession.SESSION_TRANSACTED, false, false},
+                {ActiveMQSession.AUTO_ACKNOWLEDGE, false, false},
+                {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, false, false},
+                {ActiveMQSession.CLIENT_ACKNOWLEDGE, false, false}
         });
     }
 
-    public AbstractInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
+    public AbstractInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge, boolean useTopicSubscriptionInflightStats) {
         this.ackType = ackType;
         this.optimizeAcknowledge = optimizeAcknowledge;
+        this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
     }
 
     @Before
@@ -88,6 +100,12 @@ public abstract class AbstractInflightMessageSizeTest {
         brokerService.setDeleteAllMessagesOnStartup(true);
         TransportConnector tcp = brokerService
                 .addConnector("tcp://localhost:0");
+        PolicyEntry policy = new PolicyEntry();
+        policy.setUseTopicSubscriptionInflightStats(useTopicSubscriptionInflightStats);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        brokerService.setDestinationPolicy(pMap);
+
         brokerService.start();
         //used to test optimizeAcknowledge works
         String optAckString = optimizeAcknowledge ? "?jms.optimizeAcknowledge=true&jms.optimizedAckScheduledAckInterval=2000" : "";
@@ -129,6 +147,8 @@ public abstract class AbstractInflightMessageSizeTest {
      */
     @Test(timeout=15000)
     public void testInflightMessageSize() throws Exception {
+        Assume.assumeTrue(useTopicSubscriptionInflightStats);
+
         final long size = sendMessages(10);
 
         assertTrue("Inflight message size should be greater than the content length sent", Wait.waitFor(new Wait.Condition() {
@@ -155,6 +175,8 @@ public abstract class AbstractInflightMessageSizeTest {
      */
     @Test(timeout=15000)
     public void testInflightMessageSizePrefetchFilled() throws Exception {
+        Assume.assumeTrue(useTopicSubscriptionInflightStats);
+
         final long size = sendMessages(prefetch);
 
         assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() {
@@ -187,6 +209,8 @@ public abstract class AbstractInflightMessageSizeTest {
      */
     @Test(timeout=15000)
     public void testInflightMessageSizePrefetchNotFilled() throws Exception {
+        Assume.assumeTrue(useTopicSubscriptionInflightStats);
+
         final long size = sendMessages(prefetch - 10);
 
         assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() {
@@ -227,6 +251,7 @@ public abstract class AbstractInflightMessageSizeTest {
      */
     @Test(timeout=15000)
     public void testInflightMessageSizeRollback() throws Exception {
+        Assume.assumeTrue(useTopicSubscriptionInflightStats);
         Assume.assumeTrue(ackType == ActiveMQSession.SESSION_TRANSACTED);
 
         final long size = sendMessages(10);

http://git-wip-us.apache.org/repos/asf/activemq/blob/65b0f2ad/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java
index 29d6cb7..a7e9473 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java
@@ -34,8 +34,9 @@ import org.junit.runners.Parameterized;
 @RunWith(Parameterized.class)
 public class DurableSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest {
 
-    public DurableSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
-        super(ackType, optimizeAcknowledge);
+    public DurableSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge,
+            boolean useTopicSubscriptionInflightStats) {
+        super(ackType, optimizeAcknowledge, useTopicSubscriptionInflightStats);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/65b0f2ad/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java
index 84ddc71..217aefb 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java
@@ -34,8 +34,9 @@ import org.junit.runners.Parameterized;
 @RunWith(Parameterized.class)
 public class QueueSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest {
 
-    public QueueSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
-        super(ackType, optimizeAcknowledge);
+    public QueueSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge,
+            boolean useTopicSubscriptionInflightStats) {
+        super(ackType, optimizeAcknowledge, useTopicSubscriptionInflightStats);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/65b0f2ad/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java
index 797d409..132a96d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.statistics;
 
+import static org.junit.Assert.assertTrue;
+
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
@@ -23,6 +25,9 @@ import javax.jms.MessageConsumer;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.junit.Assume;
+import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -33,8 +38,8 @@ import org.junit.runners.Parameterized;
 @RunWith(Parameterized.class)
 public class TopicSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest {
 
-    public TopicSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
-        super(ackType, optimizeAcknowledge);
+    public TopicSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge, boolean useTopicSubscriptionInflightStats) {
+        super(ackType, optimizeAcknowledge, useTopicSubscriptionInflightStats);
     }
 
     @Override
@@ -57,4 +62,29 @@ public class TopicSubscriptionInflightMessageSizeTest extends AbstractInflightMe
         return new ActiveMQTopic(destName);
     }
 
+    @Test(timeout=15000)
+    public void testInflightMessageSizeDisabled() throws Exception {
+        Assume.assumeFalse(useTopicSubscriptionInflightStats);
+        sendMessages(10);
+
+        Thread.sleep(1000);
+
+        assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getSubscription().getInFlightMessageSize() == 0;
+            }
+        }));
+
+        receiveMessages(10);
+
+        Thread.sleep(1000);
+        assertTrue("Inflight message size should still be 0", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getSubscription().getInFlightMessageSize() == 0;
+            }
+        }));
+    }
+
 }