You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/08/15 15:22:21 UTC

activemq git commit: AMQ-7035 - use NonCachedMessageEvaluationContext in place of MessageEvaluationContext to avoid unnecessary reference count management and subsequent leaks. Rework AMQ-6465 with additional JMX related tests

Repository: activemq
Updated Branches:
  refs/heads/master d8c80a982 -> 50d27e7e5


AMQ-7035 - use NonCachedMessageEvaluationContext in place of MessageEvaluationContext to avoid unnecessary reference count management and subsequent leaks. Rework AMQ-6465 with additional JMX related tests


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/50d27e7e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/50d27e7e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/50d27e7e

Branch: refs/heads/master
Commit: 50d27e7e545d30bc0d35f8dd8baf15b33522c33a
Parents: d8c80a9
Author: gtully <ga...@gmail.com>
Authored: Wed Aug 15 16:21:57 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Aug 15 16:21:57 2018 +0100

----------------------------------------------------------------------
 .../activemq/broker/ConnectionContext.java      |  3 +-
 .../activemq/broker/jmx/DestinationView.java    |  8 +--
 .../activemq/broker/region/BaseDestination.java |  2 +-
 .../cursors/FilePendingMessageCursor.java       |  2 +-
 .../network/DemandForwardingBridgeSupport.java  |  7 +--
 .../apache/activemq/broker/jmx/MBeanTest.java   | 56 ++++++++++++++++----
 .../apache/activemq/selector/SelectorTest.java  |  2 +
 .../selector/UnknownHandlingSelectorTest.java   |  5 +-
 .../usecases/LargeQueueSparseDeleteTest.java    |  9 ++--
 9 files changed, 63 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/50d27e7e/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java b/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
index 8c4db9a..3eba5d8 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
@@ -27,6 +27,7 @@ import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.security.MessageAuthorizationPolicy;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ConnectionState;
@@ -64,7 +65,7 @@ public class ConnectionContext {
     private XATransactionId xid;
 
     public ConnectionContext() {
-        this.messageEvaluationContext = new MessageEvaluationContext();
+        this.messageEvaluationContext = new NonCachedMessageEvaluationContext();
     }
 
     public ConnectionContext(MessageEvaluationContext messageEvaluationContext) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/50d27e7e/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
index 769d796..3055b57 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
@@ -51,7 +51,7 @@ import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.filter.BooleanExpression;
-import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.util.URISupport;
@@ -213,7 +213,7 @@ public class DestinationView implements DestinationViewMBean {
         Message[] messages = destination.browse();
         ArrayList<CompositeData> c = new ArrayList<CompositeData>();
 
-        MessageEvaluationContext ctx = new MessageEvaluationContext();
+        NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext();
         ctx.setDestination(destination.getActiveMQDestination());
         BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
 
@@ -256,7 +256,7 @@ public class DestinationView implements DestinationViewMBean {
         Message[] messages = destination.browse();
         ArrayList<Object> answer = new ArrayList<Object>();
 
-        MessageEvaluationContext ctx = new MessageEvaluationContext();
+        NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext();
         ctx.setDestination(destination.getActiveMQDestination());
         BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
 
@@ -297,7 +297,7 @@ public class DestinationView implements DestinationViewMBean {
         TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" });
         TabularDataSupport rc = new TabularDataSupport(tt);
 
-        MessageEvaluationContext ctx = new MessageEvaluationContext();
+        NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext();
         ctx.setDestination(destination.getActiveMQDestination());
         BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/50d27e7e/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 9681fbe..5ab6737 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -851,7 +851,7 @@ public abstract class BaseDestination implements Destination {
     }
 
     public ConnectionContext createConnectionContext() {
-        ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
+        ConnectionContext answer = new ConnectionContext();
         answer.setBroker(this.broker);
         answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
         answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);

http://git-wip-us.apache.org/repos/asf/activemq/blob/50d27e7e/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
index 20b2bc5..f23d817 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
@@ -484,7 +484,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
     private void discardExpiredMessage(MessageReference reference) {
         LOG.debug("Discarding expired message {}", reference);
         if (reference.isExpired() && broker.isExpired(reference)) {
-            ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
+            ConnectionContext context = new ConnectionContext();
             context.setBroker(broker);
             ((Destination)reference.getRegionDestination()).messageExpired(context, null, new IndirectMessageReference(reference.getMessage()));
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/50d27e7e/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 394cccd..7ce0339 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -86,7 +86,7 @@ import org.apache.activemq.command.ShutdownInfo;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.filter.DestinationFilter;
-import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.FutureResponse;
@@ -1303,13 +1303,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         // for durable subs, suppression via filter leaves dangling acks so we
         // need to check here and allow the ack irrespective
         if (sub.getLocalInfo().isDurable()) {
-            MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
+            NonCachedMessageEvaluationContext messageEvalContext = new NonCachedMessageEvaluationContext();
             messageEvalContext.setMessageReference(md.getMessage());
             messageEvalContext.setDestination(md.getDestination());
             suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
-            //AMQ-6465 - Need to decrement the reference count after checking matches() as
-            //the call above will increment the reference count by 1
-            messageEvalContext.getMessageReference().decrementReferenceCount();
         }
         return suppress;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/50d27e7e/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
index 0ccf1cb..8ecfee3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
@@ -452,15 +452,19 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
 
         queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination);
 
-        queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+        QueueViewMBean queueNew = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
         int movedSize = MESSAGE_COUNT-3;
-        assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
+        assertEquals("Unexpected number of messages ",movedSize,queueNew.getQueueSize());
 
         // now lets remove them by selector
-        queue.removeMatchingMessages("counter > 2");
+        queueNew.removeMatchingMessages("counter > 2");
 
-        assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
-        assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+        assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queueNew.getQueueSize());
+        assertEquals("dest has no memory usage", 0, queueNew.getMemoryPercentUsage());
+        assertEquals("dest has 0 memory usage", 0, queueNew.getMemoryUsageByteCount());
+
+        queue.purge();
+        assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount());
     }
 
     public void testCopyMessagesBySelector() throws Exception {
@@ -478,17 +482,47 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
 
         queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination);
 
-        queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+        QueueViewMBean queueTwo = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
 
-        LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)");
-        assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize());
+        LOG.info("Queue: " + queueViewMBeanName + " now has: " + queueTwo.getQueueSize() + " message(s)");
+        assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queueTwo.getQueueSize());
         // now lets remove them by selector
-        queue.removeMatchingMessages("counter > 2");
+        queueTwo.removeMatchingMessages("counter > 2");
 
-        assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
-        assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
+        assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queueTwo.getQueueSize());
+        assertEquals("dest has no memory usage", 0, queueTwo.getMemoryPercentUsage());
+        assertEquals("dest has 0 memory usage", 0, queueTwo.getMemoryUsageByteCount());
+
+        queue.purge();
+        assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount());
     }
 
+    public void testSelectorBrowseUsage() throws Exception {
+        connection = connectionFactory.createConnection();
+        useConnection(connection);
+
+        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
+
+        final String someSelectorExp = "JMSType = '22'";
+        QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+        queue.browse(someSelectorExp);
+        queue.purge();
+        assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount());
+
+        connection.close();
+        connection = connectionFactory.createConnection();
+        useConnection(connection);
+        queue.browseMessages(someSelectorExp);
+        queue.purge();
+        assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount());
+
+        connection.close();
+        connection = connectionFactory.createConnection();
+        useConnection(connection);
+        queue.browseAsTable(someSelectorExp);
+        queue.purge();
+        assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount());
+    }
 
     public void testCopyPurgeCopyBack() throws Exception {
         connection = connectionFactory.createConnection();

http://git-wip-us.apache.org/repos/asf/activemq/blob/50d27e7e/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java
index c65674d..66a7af4 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java
@@ -405,7 +405,9 @@ public class SelectorTest extends TestCase {
         MessageEvaluationContext context = new MessageEvaluationContext();
         context.setMessageReference((org.apache.activemq.command.Message)message);
         boolean value = selector.matches(context);
+        context.clear();
         assertEquals("Selector for: " + text, expected, value);
+        assertEquals("ref 0", 0, ((ActiveMQMessage)message).getReferenceCount());
     }
 
     protected Message createMessage(String subject) throws JMSException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/50d27e7e/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java
index 5c9a8ee..f580b2e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java
@@ -25,7 +25,7 @@ import javax.jms.Message;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.filter.BooleanExpression;
-import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -170,10 +170,11 @@ public class UnknownHandlingSelectorTest {
     protected void assertSelector(String text, boolean matches) throws JMSException {
         BooleanExpression selector = SelectorParser.parse(text);
         assertTrue("Created a valid selector", selector != null);
-        MessageEvaluationContext context = new MessageEvaluationContext();
+        NonCachedMessageEvaluationContext context = new NonCachedMessageEvaluationContext();
         context.setMessageReference((org.apache.activemq.command.Message)message);
         boolean value = selector.matches(context);
         assertEquals("Selector for: " + text, matches, value);
+        assertEquals("ref 0", 0, ((ActiveMQMessage)message).getReferenceCount());
     }
 
     private static String not(String selector) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/50d27e7e/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java
index f69f1b7..dbd3f61 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java
@@ -84,8 +84,7 @@ public class LargeQueueSparseDeleteTest extends EmbeddedBrokerTestSupport {
         Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get(
                 destination);
 
-        ConnectionContext context = new ConnectionContext(
-                new NonCachedMessageEvaluationContext());
+        ConnectionContext context = new ConnectionContext();
         context.setBroker(broker.getBroker());
         context.getMessageEvaluationContext().setDestination(destination);
 
@@ -133,8 +132,7 @@ public class LargeQueueSparseDeleteTest extends EmbeddedBrokerTestSupport {
         Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get(
                 destination);
 
-        ConnectionContext context = new ConnectionContext(
-                new NonCachedMessageEvaluationContext());
+        ConnectionContext context = new ConnectionContext();
         context.setBroker(broker.getBroker());
         context.getMessageEvaluationContext().setDestination(destination);
 
@@ -179,8 +177,7 @@ public class LargeQueueSparseDeleteTest extends EmbeddedBrokerTestSupport {
         Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get(
                 destination);
 
-        ConnectionContext context = new ConnectionContext(
-                new NonCachedMessageEvaluationContext());
+        ConnectionContext context = new ConnectionContext();
         context.setBroker(broker.getBroker());
         context.getMessageEvaluationContext().setDestination(destination);