You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2018/03/29 17:25:48 UTC
activemq git commit: AMQ-6940 - Add flag to disable TopicSubscription
in flight stats
Repository: activemq
Updated Branches:
refs/heads/master f69fd6f00 -> 65b0f2ad0
AMQ-6940 - Add flag to disable TopicSubscription in flight stats
To save memory usage in some use cases add a new flag to PolicyEntry
called useTopicSubscriptionInflightStats to allow disabling the
inflight stats
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/65b0f2ad
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/65b0f2ad
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/65b0f2ad
Branch: refs/heads/master
Commit: 65b0f2ad0d48845ad54681ac0eff832de122e2a9
Parents: f69fd6f
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Thu Mar 29 13:23:33 2018 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Mar 29 13:25:36 2018 -0400
----------------------------------------------------------------------
.../broker/region/TopicSubscription.java | 96 ++++++++++++--------
.../broker/region/policy/PolicyEntry.java | 10 ++
.../AbstractInflightMessageSizeTest.java | 43 +++++++--
...ableSubscriptionInflightMessageSizeTest.java | 5 +-
...ueueSubscriptionInflightMessageSizeTest.java | 5 +-
...opicSubscriptionInflightMessageSizeTest.java | 34 ++++++-
6 files changed, 142 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/65b0f2ad/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 4962de6..bf3f97b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -32,7 +32,6 @@ import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
-import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
@@ -72,6 +71,7 @@ public class TopicSubscription extends AbstractSubscription {
protected ActiveMQMessageAudit audit;
protected boolean active = false;
protected boolean discarding = false;
+ private boolean useTopicSubscriptionInflightStats = true;
//Used for inflight message size calculations
protected final Object dispatchLock = new Object();
@@ -258,8 +258,10 @@ public class TopicSubscription extends AbstractSubscription {
synchronized(dispatchLock) {
matched.remove();
getSubscriptionStatistics().getDispatched().increment();
- dispatched.add(new DispatchedNode(node));
- getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
+ if (isUseTopicSubscriptionInflightStats()) {
+ dispatched.add(new DispatchedNode(node));
+ getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
+ }
node.decrementReferenceCount();
}
break;
@@ -382,43 +384,55 @@ public class TopicSubscription extends AbstractSubscription {
* @param ack
*/
private void updateStatsOnAck(final MessageAck ack) {
- synchronized(dispatchLock) {
- boolean inAckRange = false;
- List<DispatchedNode> removeList = new ArrayList<>();
- for (final DispatchedNode node : dispatched) {
- MessageId messageId = node.getMessageId();
- if (ack.getFirstMessageId() == null
- || ack.getFirstMessageId().equals(messageId)) {
- inAckRange = true;
- }
- if (inAckRange) {
- removeList.add(node);
- if (ack.getLastMessageId().equals(messageId)) {
- break;
+ //Allow disabling inflight stats to save memory usage
+ if (isUseTopicSubscriptionInflightStats()) {
+ synchronized(dispatchLock) {
+ boolean inAckRange = false;
+ List<DispatchedNode> removeList = new ArrayList<>();
+ for (final DispatchedNode node : dispatched) {
+ MessageId messageId = node.getMessageId();
+ if (ack.getFirstMessageId() == null
+ || ack.getFirstMessageId().equals(messageId)) {
+ inAckRange = true;
+ }
+ if (inAckRange) {
+ removeList.add(node);
+ if (ack.getLastMessageId().equals(messageId)) {
+ break;
+ }
}
}
- }
- for (final DispatchedNode node : removeList) {
- dispatched.remove(node);
- getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
- getSubscriptionStatistics().getDequeues().increment();
-
- final Destination destination = node.getDestination();
- if (destination != null) {
- destination.getDestinationStatistics().getDequeues().increment();
- destination.getDestinationStatistics().getInflight().decrement();
- if (info.isNetworkSubscription()) {
- destination.getDestinationStatistics().getForwards().increment();
- }
- if (ack.isExpiredAck()) {
- destination.getDestinationStatistics().getExpired().increment();
+ for (final DispatchedNode node : removeList) {
+ dispatched.remove(node);
+ getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
+
+ final Destination destination = node.getDestination();
+ incrementStatsOnAck(destination, ack, 1);
+ if (!ack.isInTransaction()) {
+ contractPrefetchExtension(1);
}
}
- if (!ack.isInTransaction()) {
- contractPrefetchExtension(1);
- }
}
+ } else {
+ if (singleDestination && destination != null) {
+ incrementStatsOnAck(destination, ack, ack.getMessageCount());
+ }
+ if (!ack.isInTransaction()) {
+ contractPrefetchExtension(ack.getMessageCount());
+ }
+ }
+ }
+
+ private void incrementStatsOnAck(final Destination destination, final MessageAck ack, final int count) {
+ getSubscriptionStatistics().getDequeues().add(count);
+ destination.getDestinationStatistics().getDequeues().add(count);
+ destination.getDestinationStatistics().getInflight().subtract(count);
+ if (info.isNetworkSubscription()) {
+ destination.getDestinationStatistics().getForwards().add(count);
+ }
+ if (ack.isExpiredAck()) {
+ destination.getDestinationStatistics().getExpired().add(count);
}
}
@@ -653,8 +667,10 @@ public class TopicSubscription extends AbstractSubscription {
md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
synchronized(dispatchLock) {
getSubscriptionStatistics().getDispatched().increment();
- dispatched.add(new DispatchedNode(node));
- getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
+ if (isUseTopicSubscriptionInflightStats()) {
+ dispatched.add(new DispatchedNode(node));
+ getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
+ }
}
// Keep track if this subscription is receiving messages from a single destination.
@@ -764,6 +780,14 @@ public class TopicSubscription extends AbstractSubscription {
}
}
+ public boolean isUseTopicSubscriptionInflightStats() {
+ return useTopicSubscriptionInflightStats;
+ }
+
+ public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) {
+ this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
+ }
+
private static class DispatchedNode {
private final int size;
private final MessageId messageId;
http://git-wip-us.apache.org/repos/asf/activemq/blob/65b0f2ad/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index c5e5980..164b984 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -103,6 +103,7 @@ public class PolicyEntry extends DestinationMapEntry {
private NetworkBridgeFilterFactory networkBridgeFilterFactory;
private boolean doOptimzeMessageStorage = true;
private int maxDestinations = -1;
+ private boolean useTopicSubscriptionInflightStats = true;
/*
* percentage of in-flight messages above which optimize message store is disabled
@@ -323,6 +324,7 @@ public class PolicyEntry extends DestinationMapEntry {
configurePrefetch(subscription);
subscription.setUsePrefetchExtension(isUsePrefetchExtension());
subscription.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
+ subscription.setUseTopicSubscriptionInflightStats(isUseTopicSubscriptionInflightStats());
if (pendingMessageLimitStrategy != null) {
int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit();
@@ -1128,4 +1130,12 @@ public class PolicyEntry extends DestinationMapEntry {
public long getSendFailIfNoSpaceAfterTimeout() {
return this.sendFailIfNoSpaceAfterTimeout;
}
+
+ public boolean isUseTopicSubscriptionInflightStats() {
+ return useTopicSubscriptionInflightStats;
+ }
+
+ public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) {
+ this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/65b0f2ad/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
index 07784e7..a127feb 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
@@ -38,6 +38,8 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.Wait;
import org.junit.After;
@@ -59,6 +61,7 @@ public abstract class AbstractInflightMessageSizeTest {
protected Destination amqDestination;
protected MessageConsumer consumer;
protected int prefetch = 100;
+ protected boolean useTopicSubscriptionInflightStats;
final protected int ackType;
final protected boolean optimizeAcknowledge;
final protected String destName = "testDest";
@@ -66,20 +69,29 @@ public abstract class AbstractInflightMessageSizeTest {
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
- {ActiveMQSession.SESSION_TRANSACTED, true},
- {ActiveMQSession.AUTO_ACKNOWLEDGE, true},
- {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, true},
- {ActiveMQSession.CLIENT_ACKNOWLEDGE, true},
- {ActiveMQSession.SESSION_TRANSACTED, false},
- {ActiveMQSession.AUTO_ACKNOWLEDGE, false},
- {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, false},
- {ActiveMQSession.CLIENT_ACKNOWLEDGE, false}
+ {ActiveMQSession.SESSION_TRANSACTED, true, true},
+ {ActiveMQSession.AUTO_ACKNOWLEDGE, true, true},
+ {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, true, true},
+ {ActiveMQSession.CLIENT_ACKNOWLEDGE, true, true},
+ {ActiveMQSession.SESSION_TRANSACTED, false, true},
+ {ActiveMQSession.AUTO_ACKNOWLEDGE, false, true},
+ {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, false, true},
+ {ActiveMQSession.CLIENT_ACKNOWLEDGE, false, true},
+ {ActiveMQSession.SESSION_TRANSACTED, true, false},
+ {ActiveMQSession.AUTO_ACKNOWLEDGE, true, false},
+ {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, true, false},
+ {ActiveMQSession.CLIENT_ACKNOWLEDGE, true, false},
+ {ActiveMQSession.SESSION_TRANSACTED, false, false},
+ {ActiveMQSession.AUTO_ACKNOWLEDGE, false, false},
+ {ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE, false, false},
+ {ActiveMQSession.CLIENT_ACKNOWLEDGE, false, false}
});
}
- public AbstractInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
+ public AbstractInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge, boolean useTopicSubscriptionInflightStats) {
this.ackType = ackType;
this.optimizeAcknowledge = optimizeAcknowledge;
+ this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
}
@Before
@@ -88,6 +100,12 @@ public abstract class AbstractInflightMessageSizeTest {
brokerService.setDeleteAllMessagesOnStartup(true);
TransportConnector tcp = brokerService
.addConnector("tcp://localhost:0");
+ PolicyEntry policy = new PolicyEntry();
+ policy.setUseTopicSubscriptionInflightStats(useTopicSubscriptionInflightStats);
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+ brokerService.setDestinationPolicy(pMap);
+
brokerService.start();
//used to test optimizeAcknowledge works
String optAckString = optimizeAcknowledge ? "?jms.optimizeAcknowledge=true&jms.optimizedAckScheduledAckInterval=2000" : "";
@@ -129,6 +147,8 @@ public abstract class AbstractInflightMessageSizeTest {
*/
@Test(timeout=15000)
public void testInflightMessageSize() throws Exception {
+ Assume.assumeTrue(useTopicSubscriptionInflightStats);
+
final long size = sendMessages(10);
assertTrue("Inflight message size should be greater than the content length sent", Wait.waitFor(new Wait.Condition() {
@@ -155,6 +175,8 @@ public abstract class AbstractInflightMessageSizeTest {
*/
@Test(timeout=15000)
public void testInflightMessageSizePrefetchFilled() throws Exception {
+ Assume.assumeTrue(useTopicSubscriptionInflightStats);
+
final long size = sendMessages(prefetch);
assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() {
@@ -187,6 +209,8 @@ public abstract class AbstractInflightMessageSizeTest {
*/
@Test(timeout=15000)
public void testInflightMessageSizePrefetchNotFilled() throws Exception {
+ Assume.assumeTrue(useTopicSubscriptionInflightStats);
+
final long size = sendMessages(prefetch - 10);
assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() {
@@ -227,6 +251,7 @@ public abstract class AbstractInflightMessageSizeTest {
*/
@Test(timeout=15000)
public void testInflightMessageSizeRollback() throws Exception {
+ Assume.assumeTrue(useTopicSubscriptionInflightStats);
Assume.assumeTrue(ackType == ActiveMQSession.SESSION_TRANSACTED);
final long size = sendMessages(10);
http://git-wip-us.apache.org/repos/asf/activemq/blob/65b0f2ad/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java
index 29d6cb7..a7e9473 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java
@@ -34,8 +34,9 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class DurableSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest {
- public DurableSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
- super(ackType, optimizeAcknowledge);
+ public DurableSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge,
+ boolean useTopicSubscriptionInflightStats) {
+ super(ackType, optimizeAcknowledge, useTopicSubscriptionInflightStats);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/65b0f2ad/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java
index 84ddc71..217aefb 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java
@@ -34,8 +34,9 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class QueueSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest {
- public QueueSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
- super(ackType, optimizeAcknowledge);
+ public QueueSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge,
+ boolean useTopicSubscriptionInflightStats) {
+ super(ackType, optimizeAcknowledge, useTopicSubscriptionInflightStats);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/65b0f2ad/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java
index 797d409..132a96d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.statistics;
+import static org.junit.Assert.assertTrue;
+
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
@@ -23,6 +25,9 @@ import javax.jms.MessageConsumer;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.junit.Assume;
+import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -33,8 +38,8 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TopicSubscriptionInflightMessageSizeTest extends AbstractInflightMessageSizeTest {
- public TopicSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge) {
- super(ackType, optimizeAcknowledge);
+ public TopicSubscriptionInflightMessageSizeTest(int ackType, boolean optimizeAcknowledge, boolean useTopicSubscriptionInflightStats) {
+ super(ackType, optimizeAcknowledge, useTopicSubscriptionInflightStats);
}
@Override
@@ -57,4 +62,29 @@ public class TopicSubscriptionInflightMessageSizeTest extends AbstractInflightMe
return new ActiveMQTopic(destName);
}
+ @Test(timeout=15000)
+ public void testInflightMessageSizeDisabled() throws Exception {
+ Assume.assumeFalse(useTopicSubscriptionInflightStats);
+ sendMessages(10);
+
+ Thread.sleep(1000);
+
+ assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getSubscription().getInFlightMessageSize() == 0;
+ }
+ }));
+
+ receiveMessages(10);
+
+ Thread.sleep(1000);
+ assertTrue("Inflight message size should still be 0", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getSubscription().getInFlightMessageSize() == 0;
+ }
+ }));
+ }
+
}