You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/08/15 15:22:21 UTC
activemq git commit: AMQ-7035 - use NonCachedMessageEvaluationContext
in place of MessageEvaluationContext to avoid unnecessary reference count
management and subsequent leaks. Rework AMQ-6465 with additional JMX related
tests
Repository: activemq
Updated Branches:
refs/heads/master d8c80a982 -> 50d27e7e5
AMQ-7035 - use NonCachedMessageEvaluationContext in place of MessageEvaluationContext to avoid unnecessary reference count management and subsequent leaks. Rework AMQ-6465 with additional JMX related tests
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/50d27e7e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/50d27e7e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/50d27e7e
Branch: refs/heads/master
Commit: 50d27e7e545d30bc0d35f8dd8baf15b33522c33a
Parents: d8c80a9
Author: gtully <ga...@gmail.com>
Authored: Wed Aug 15 16:21:57 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Aug 15 16:21:57 2018 +0100
----------------------------------------------------------------------
.../activemq/broker/ConnectionContext.java | 3 +-
.../activemq/broker/jmx/DestinationView.java | 8 +--
.../activemq/broker/region/BaseDestination.java | 2 +-
.../cursors/FilePendingMessageCursor.java | 2 +-
.../network/DemandForwardingBridgeSupport.java | 7 +--
.../apache/activemq/broker/jmx/MBeanTest.java | 56 ++++++++++++++++----
.../apache/activemq/selector/SelectorTest.java | 2 +
.../selector/UnknownHandlingSelectorTest.java | 5 +-
.../usecases/LargeQueueSparseDeleteTest.java | 9 ++--
9 files changed, 63 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/50d27e7e/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java b/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
index 8c4db9a..3eba5d8 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
@@ -27,6 +27,7 @@ import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ConnectionState;
@@ -64,7 +65,7 @@ public class ConnectionContext {
private XATransactionId xid;
public ConnectionContext() {
- this.messageEvaluationContext = new MessageEvaluationContext();
+ this.messageEvaluationContext = new NonCachedMessageEvaluationContext();
}
public ConnectionContext(MessageEvaluationContext messageEvaluationContext) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/50d27e7e/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
index 769d796..3055b57 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
@@ -51,7 +51,7 @@ import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.BooleanExpression;
-import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.util.URISupport;
@@ -213,7 +213,7 @@ public class DestinationView implements DestinationViewMBean {
Message[] messages = destination.browse();
ArrayList<CompositeData> c = new ArrayList<CompositeData>();
- MessageEvaluationContext ctx = new MessageEvaluationContext();
+ NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext();
ctx.setDestination(destination.getActiveMQDestination());
BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
@@ -256,7 +256,7 @@ public class DestinationView implements DestinationViewMBean {
Message[] messages = destination.browse();
ArrayList<Object> answer = new ArrayList<Object>();
- MessageEvaluationContext ctx = new MessageEvaluationContext();
+ NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext();
ctx.setDestination(destination.getActiveMQDestination());
BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
@@ -297,7 +297,7 @@ public class DestinationView implements DestinationViewMBean {
TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" });
TabularDataSupport rc = new TabularDataSupport(tt);
- MessageEvaluationContext ctx = new MessageEvaluationContext();
+ NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext();
ctx.setDestination(destination.getActiveMQDestination());
BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
http://git-wip-us.apache.org/repos/asf/activemq/blob/50d27e7e/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 9681fbe..5ab6737 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -851,7 +851,7 @@ public abstract class BaseDestination implements Destination {
}
public ConnectionContext createConnectionContext() {
- ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
+ ConnectionContext answer = new ConnectionContext();
answer.setBroker(this.broker);
answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
http://git-wip-us.apache.org/repos/asf/activemq/blob/50d27e7e/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
index 20b2bc5..f23d817 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
@@ -484,7 +484,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
private void discardExpiredMessage(MessageReference reference) {
LOG.debug("Discarding expired message {}", reference);
if (reference.isExpired() && broker.isExpired(reference)) {
- ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
+ ConnectionContext context = new ConnectionContext();
context.setBroker(broker);
((Destination)reference.getRegionDestination()).messageExpired(context, null, new IndirectMessageReference(reference.getMessage()));
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/50d27e7e/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 394cccd..7ce0339 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -86,7 +86,7 @@ import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.DestinationFilter;
-import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
@@ -1303,13 +1303,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
// for durable subs, suppression via filter leaves dangling acks so we
// need to check here and allow the ack irrespective
if (sub.getLocalInfo().isDurable()) {
- MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
+ NonCachedMessageEvaluationContext messageEvalContext = new NonCachedMessageEvaluationContext();
messageEvalContext.setMessageReference(md.getMessage());
messageEvalContext.setDestination(md.getDestination());
suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
- //AMQ-6465 - Need to decrement the reference count after checking matches() as
- //the call above will increment the reference count by 1
- messageEvalContext.getMessageReference().decrementReferenceCount();
}
return suppress;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/50d27e7e/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
index 0ccf1cb..8ecfee3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
@@ -452,15 +452,19 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination);
- queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+ QueueViewMBean queueNew = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
int movedSize = MESSAGE_COUNT-3;
- assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
+ assertEquals("Unexpected number of messages ",movedSize,queueNew.getQueueSize());
// now lets remove them by selector
- queue.removeMatchingMessages("counter > 2");
+ queueNew.removeMatchingMessages("counter > 2");
- assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
- assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+ assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queueNew.getQueueSize());
+ assertEquals("dest has no memory usage", 0, queueNew.getMemoryPercentUsage());
+ assertEquals("dest has 0 memory usage", 0, queueNew.getMemoryUsageByteCount());
+
+ queue.purge();
+ assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount());
}
public void testCopyMessagesBySelector() throws Exception {
@@ -478,17 +482,47 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination);
- queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+ QueueViewMBean queueTwo = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
- LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)");
- assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize());
+ LOG.info("Queue: " + queueViewMBeanName + " now has: " + queueTwo.getQueueSize() + " message(s)");
+ assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queueTwo.getQueueSize());
// now lets remove them by selector
- queue.removeMatchingMessages("counter > 2");
+ queueTwo.removeMatchingMessages("counter > 2");
- assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
- assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+ assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queueTwo.getQueueSize());
+ assertEquals("dest has no memory usage", 0, queueTwo.getMemoryPercentUsage());
+ assertEquals("dest has 0 memory usage", 0, queueTwo.getMemoryUsageByteCount());
+
+ queue.purge();
+ assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount());
}
+ public void testSelectorBrowseUsage() throws Exception {
+ connection = connectionFactory.createConnection();
+ useConnection(connection);
+
+ ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
+
+ final String someSelectorExp = "JMSType = '22'";
+ QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+ queue.browse(someSelectorExp);
+ queue.purge();
+ assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount());
+
+ connection.close();
+ connection = connectionFactory.createConnection();
+ useConnection(connection);
+ queue.browseMessages(someSelectorExp);
+ queue.purge();
+ assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount());
+
+ connection.close();
+ connection = connectionFactory.createConnection();
+ useConnection(connection);
+ queue.browseAsTable(someSelectorExp);
+ queue.purge();
+ assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount());
+ }
public void testCopyPurgeCopyBack() throws Exception {
connection = connectionFactory.createConnection();
http://git-wip-us.apache.org/repos/asf/activemq/blob/50d27e7e/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java
index c65674d..66a7af4 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java
@@ -405,7 +405,9 @@ public class SelectorTest extends TestCase {
MessageEvaluationContext context = new MessageEvaluationContext();
context.setMessageReference((org.apache.activemq.command.Message)message);
boolean value = selector.matches(context);
+ context.clear();
assertEquals("Selector for: " + text, expected, value);
+ assertEquals("ref 0", 0, ((ActiveMQMessage)message).getReferenceCount());
}
protected Message createMessage(String subject) throws JMSException {
http://git-wip-us.apache.org/repos/asf/activemq/blob/50d27e7e/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java
index 5c9a8ee..f580b2e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java
@@ -25,7 +25,7 @@ import javax.jms.Message;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.filter.BooleanExpression;
-import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.junit.Before;
import org.junit.Test;
@@ -170,10 +170,11 @@ public class UnknownHandlingSelectorTest {
protected void assertSelector(String text, boolean matches) throws JMSException {
BooleanExpression selector = SelectorParser.parse(text);
assertTrue("Created a valid selector", selector != null);
- MessageEvaluationContext context = new MessageEvaluationContext();
+ NonCachedMessageEvaluationContext context = new NonCachedMessageEvaluationContext();
context.setMessageReference((org.apache.activemq.command.Message)message);
boolean value = selector.matches(context);
assertEquals("Selector for: " + text, matches, value);
+ assertEquals("ref 0", 0, ((ActiveMQMessage)message).getReferenceCount());
}
private static String not(String selector) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/50d27e7e/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java
index f69f1b7..dbd3f61 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java
@@ -84,8 +84,7 @@ public class LargeQueueSparseDeleteTest extends EmbeddedBrokerTestSupport {
Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get(
destination);
- ConnectionContext context = new ConnectionContext(
- new NonCachedMessageEvaluationContext());
+ ConnectionContext context = new ConnectionContext();
context.setBroker(broker.getBroker());
context.getMessageEvaluationContext().setDestination(destination);
@@ -133,8 +132,7 @@ public class LargeQueueSparseDeleteTest extends EmbeddedBrokerTestSupport {
Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get(
destination);
- ConnectionContext context = new ConnectionContext(
- new NonCachedMessageEvaluationContext());
+ ConnectionContext context = new ConnectionContext();
context.setBroker(broker.getBroker());
context.getMessageEvaluationContext().setDestination(destination);
@@ -179,8 +177,7 @@ public class LargeQueueSparseDeleteTest extends EmbeddedBrokerTestSupport {
Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get(
destination);
- ConnectionContext context = new ConnectionContext(
- new NonCachedMessageEvaluationContext());
+ ConnectionContext context = new ConnectionContext();
context.setBroker(broker.getBroker());
context.getMessageEvaluationContext().setDestination(destination);