You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2019/09/10 10:46:18 UTC

[activemq] branch master updated: AMQ-7302 - make jmx ops that pageIn aware of cursor memory limits to avoid excessive looping, fix and test

This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 501d553  AMQ-7302 - make jmx ops that pageIn aware of cursor memory limits to avoid excessive looping, fix and test
501d553 is described below

commit 501d55337adaecdad7fb6311afcf618503a9e9b5
Author: gtully <ga...@gmail.com>
AuthorDate: Tue Sep 10 11:46:04 2019 +0100

    AMQ-7302 - make jmx ops that pageIn aware of cursor memory limits to avoid excessive looping, fix and test
---
 .../org/apache/activemq/broker/jmx/QueueView.java  |  13 +-
 .../org/apache/activemq/broker/region/Queue.java   |  20 ++-
 .../broker/jmx/JmxOpPageInOnMemoryLimit.java       | 149 +++++++++++++++++++++
 3 files changed, 173 insertions(+), 9 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
index 372bb80..64a4c27 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
@@ -64,7 +64,7 @@ public class QueueView extends DestinationView implements QueueViewMBean {
         LOG.info("{} purge of {} messages", destination.getActiveMQDestination().getQualifiedName(), originalMessageCount);
     }
 
-    public boolean removeMessage(String messageId) throws Exception {
+    public synchronized boolean removeMessage(String messageId) throws Exception {
         return ((Queue)destination).removeMessage(messageId);
     }
 
@@ -76,25 +76,25 @@ public class QueueView extends DestinationView implements QueueViewMBean {
         return ((Queue)destination).removeMatchingMessages(selector, maximumMessages);
     }
 
-    public boolean copyMessageTo(String messageId, String destinationName) throws Exception {
+    public synchronized boolean copyMessageTo(String messageId, String destinationName) throws Exception {
         ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
         ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
         return ((Queue)destination).copyMessageTo(context, messageId, toDestination);
     }
 
-    public int copyMatchingMessagesTo(String selector, String destinationName) throws Exception {
+    public synchronized int copyMatchingMessagesTo(String selector, String destinationName) throws Exception {
         ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
         ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
         return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination);
     }
 
-    public int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
+    public synchronized int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
         ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
         ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
         return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination, maximumMessages);
     }
 
-    public boolean moveMessageTo(String messageId, String destinationName) throws Exception {
+    public synchronized boolean moveMessageTo(String messageId, String destinationName) throws Exception {
         ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
         ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
         return ((Queue)destination).moveMessageTo(context, messageId, toDestination);
@@ -123,6 +123,9 @@ public class QueueView extends DestinationView implements QueueViewMBean {
     public boolean retryMessage(String messageId) throws Exception {
         Queue queue = (Queue) destination;
         QueueMessageReference ref = queue.getMessage(messageId);
+        if (ref == null) {
+            throw new JMSException("Could not find message reference: "+ messageId);
+        }
         Message rc = ref.getMessage();
         if (rc != null) {
             ActiveMQDestination originalDestination = rc.getOriginalDestination();
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index fa75752..86b8c6d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1405,7 +1405,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
             doPageIn(true);
             pagedInMessagesLock.readLock().lock();
             try {
-                set.addAll(pagedInMessages.values());
+                if (!set.addAll(pagedInMessages.values())) {
+                    // nothing new to check - mem constraint on page in
+                    return movedCounter;
+                };
             } finally {
                 pagedInMessagesLock.readLock().unlock();
             }
@@ -1474,7 +1477,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
             doPageIn(true, false, (messages.isCacheEnabled() || !broker.getBrokerService().isPersistent()) ? messages.size() : getMaxBrowsePageSize());
             pagedInMessagesLock.readLock().lock();
             try {
-                set.addAll(pagedInMessages.values());
+                if (!set.addAll(pagedInMessages.values())) {
+                    // nothing new to check - mem constraint on page in
+                    return movedCounter;
+                }
             } finally {
                 pagedInMessagesLock.readLock().unlock();
             }
@@ -1591,7 +1597,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
             doPageIn(true);
             pagedInMessagesLock.readLock().lock();
             try {
-                set.addAll(pagedInMessages.values());
+                if (!set.addAll(pagedInMessages.values())) {
+                    // nothing new to check - mem constraint on page in
+                    return movedCounter;
+                }
             } finally {
                 pagedInMessagesLock.readLock().unlock();
             }
@@ -1623,7 +1632,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
             doPageIn(true);
             pagedInMessagesLock.readLock().lock();
             try {
-                set.addAll(pagedInMessages.values());
+                if (!set.addAll(pagedInMessages.values())) {
+                    // nothing new to check - mem constraint on page in
+                    return restoredCounter;
+                }
             } finally {
                 pagedInMessagesLock.readLock().unlock();
             }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimit.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimit.java
new file mode 100644
index 0000000..459bec1
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimit.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.*;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import static org.junit.Assert.*;
+
+// https://issues.apache.org/jira/browse/AMQ-7302
+public class JmxOpPageInOnMemoryLimit {
+
+    BrokerService broker;
+    protected MBeanServer mbeanServer;
+    protected String domain = "org.apache.activemq";
+
+    protected Connection connection;
+    protected int messageCount = 4000;
+    ActiveMQQueue destination = new ActiveMQQueue("QUEUE_TO_FILL_PAST_MEM_LIMIT");
+    String lastMessageId = "";
+
+    @Test(timeout = 60*1000)
+    public void testNoHangOnPageInForJmxOps() throws Exception {
+
+        // Now get the QueueViewMBean and ...
+        String objectNameStr = broker.getBrokerObjectName().toString();
+        objectNameStr += ",destinationType=Queue,destinationName="+destination.getQueueName();
+        ObjectName queueViewMBeanName = assertRegisteredObjectName(objectNameStr);
+        final QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+        assertFalse("limit reached, cache disabled", proxy.isCacheEnabled());
+
+        proxy.removeMessage(lastMessageId);
+
+        proxy.copyMessageTo(lastMessageId, "someOtherQ");
+
+        proxy.moveMatchingMessagesTo("JMSMessageID = '" + lastMessageId +  "'","someOtherQ");
+
+
+        // flick dlq flag to allow retry work
+        proxy.setDLQ(true);
+        proxy.retryMessages();
+
+        try {
+            proxy.retryMessage(lastMessageId);
+        } catch (JMSException expected) {
+            assertTrue("Could not find", expected.getMessage().contains("find"));
+        }
+
+        long count = proxy.getQueueSize();
+        boolean cursorFull = proxy.getCursorPercentUsage() >= 70;
+        assertTrue("Cursor full", cursorFull);
+
+        assertEquals("Queue size", messageCount, count);
+    }
+
+    private String produceMessages() throws Exception {
+        connection = createConnectionFactory().createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        String trackLastMessageId = "";
+        MessageProducer producer = session.createProducer(destination);
+        final byte[] payload = new byte[1024];
+        for (int i = 0; i < messageCount; i++) {
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(payload);
+            producer.send(message);
+            trackLastMessageId = message.getJMSMessageID();
+        }
+        producer.close();
+        connection.close();
+        return trackLastMessageId;
+    }
+
+
+    protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException {
+        ObjectName objectName = new ObjectName(name);
+        if (!mbeanServer.isRegistered(objectName)) {
+            fail("Could not find MBean!: " + objectName);
+        }
+        return objectName;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        createBroker();
+        mbeanServer = broker.getManagementContext().getMBeanServer();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        broker.stop();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setUseJmx(true);
+        broker.setEnableStatistics(true);
+        broker.addConnector("tcp://localhost:0");
+        ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
+
+        broker.deleteAllMessages();
+
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setMemoryLimit(1024*1024);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(policyEntry);
+        broker.setDestinationPolicy(policyMap);
+        broker.start();
+        lastMessageId = produceMessages();
+        return broker;
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
+    }
+
+}