You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2019/09/10 10:46:18 UTC
[activemq] branch master updated: AMQ-7302 - make jmx ops that
pageIn aware of cursor memory limits to avoid excessive looping,
fix and test
This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/master by this push:
new 501d553 AMQ-7302 - make jmx ops that pageIn aware of cursor memory limits to avoid excessive looping, fix and test
501d553 is described below
commit 501d55337adaecdad7fb6311afcf618503a9e9b5
Author: gtully <ga...@gmail.com>
AuthorDate: Tue Sep 10 11:46:04 2019 +0100
AMQ-7302 - make jmx ops that pageIn aware of cursor memory limits to avoid excessive looping, fix and test
---
.../org/apache/activemq/broker/jmx/QueueView.java | 13 +-
.../org/apache/activemq/broker/region/Queue.java | 20 ++-
.../broker/jmx/JmxOpPageInOnMemoryLimit.java | 149 +++++++++++++++++++++
3 files changed, 173 insertions(+), 9 deletions(-)
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
index 372bb80..64a4c27 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
@@ -64,7 +64,7 @@ public class QueueView extends DestinationView implements QueueViewMBean {
LOG.info("{} purge of {} messages", destination.getActiveMQDestination().getQualifiedName(), originalMessageCount);
}
- public boolean removeMessage(String messageId) throws Exception {
+ public synchronized boolean removeMessage(String messageId) throws Exception {
return ((Queue)destination).removeMessage(messageId);
}
@@ -76,25 +76,25 @@ public class QueueView extends DestinationView implements QueueViewMBean {
return ((Queue)destination).removeMatchingMessages(selector, maximumMessages);
}
- public boolean copyMessageTo(String messageId, String destinationName) throws Exception {
+ public synchronized boolean copyMessageTo(String messageId, String destinationName) throws Exception {
ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
return ((Queue)destination).copyMessageTo(context, messageId, toDestination);
}
- public int copyMatchingMessagesTo(String selector, String destinationName) throws Exception {
+ public synchronized int copyMatchingMessagesTo(String selector, String destinationName) throws Exception {
ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination);
}
- public int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
+ public synchronized int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination, maximumMessages);
}
- public boolean moveMessageTo(String messageId, String destinationName) throws Exception {
+ public synchronized boolean moveMessageTo(String messageId, String destinationName) throws Exception {
ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
return ((Queue)destination).moveMessageTo(context, messageId, toDestination);
@@ -123,6 +123,9 @@ public class QueueView extends DestinationView implements QueueViewMBean {
public boolean retryMessage(String messageId) throws Exception {
Queue queue = (Queue) destination;
QueueMessageReference ref = queue.getMessage(messageId);
+ if (ref == null) {
+ throw new JMSException("Could not find message reference: "+ messageId);
+ }
Message rc = ref.getMessage();
if (rc != null) {
ActiveMQDestination originalDestination = rc.getOriginalDestination();
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index fa75752..86b8c6d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1405,7 +1405,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
doPageIn(true);
pagedInMessagesLock.readLock().lock();
try {
- set.addAll(pagedInMessages.values());
+ if (!set.addAll(pagedInMessages.values())) {
+ // nothing new to check - mem constraint on page in
+ return movedCounter;
+ };
} finally {
pagedInMessagesLock.readLock().unlock();
}
@@ -1474,7 +1477,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
doPageIn(true, false, (messages.isCacheEnabled() || !broker.getBrokerService().isPersistent()) ? messages.size() : getMaxBrowsePageSize());
pagedInMessagesLock.readLock().lock();
try {
- set.addAll(pagedInMessages.values());
+ if (!set.addAll(pagedInMessages.values())) {
+ // nothing new to check - mem constraint on page in
+ return movedCounter;
+ }
} finally {
pagedInMessagesLock.readLock().unlock();
}
@@ -1591,7 +1597,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
doPageIn(true);
pagedInMessagesLock.readLock().lock();
try {
- set.addAll(pagedInMessages.values());
+ if (!set.addAll(pagedInMessages.values())) {
+ // nothing new to check - mem constraint on page in
+ return movedCounter;
+ }
} finally {
pagedInMessagesLock.readLock().unlock();
}
@@ -1623,7 +1632,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
doPageIn(true);
pagedInMessagesLock.readLock().lock();
try {
- set.addAll(pagedInMessages.values());
+ if (!set.addAll(pagedInMessages.values())) {
+ // nothing new to check - mem constraint on page in
+ return restoredCounter;
+ }
} finally {
pagedInMessagesLock.readLock().unlock();
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimit.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimit.java
new file mode 100644
index 0000000..459bec1
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/JmxOpPageInOnMemoryLimit.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.*;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import static org.junit.Assert.*;
+
+// https://issues.apache.org/jira/browse/AMQ-7302
+public class JmxOpPageInOnMemoryLimit {
+
+ BrokerService broker;
+ protected MBeanServer mbeanServer;
+ protected String domain = "org.apache.activemq";
+
+ protected Connection connection;
+ protected int messageCount = 4000;
+ ActiveMQQueue destination = new ActiveMQQueue("QUEUE_TO_FILL_PAST_MEM_LIMIT");
+ String lastMessageId = "";
+
+ @Test(timeout = 60*1000)
+ public void testNoHangOnPageInForJmxOps() throws Exception {
+
+ // Now get the QueueViewMBean and ...
+ String objectNameStr = broker.getBrokerObjectName().toString();
+ objectNameStr += ",destinationType=Queue,destinationName="+destination.getQueueName();
+ ObjectName queueViewMBeanName = assertRegisteredObjectName(objectNameStr);
+ final QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+ assertFalse("limit reached, cache disabled", proxy.isCacheEnabled());
+
+ proxy.removeMessage(lastMessageId);
+
+ proxy.copyMessageTo(lastMessageId, "someOtherQ");
+
+ proxy.moveMatchingMessagesTo("JMSMessageID = '" + lastMessageId + "'","someOtherQ");
+
+
+ // flick dlq flag to allow retry work
+ proxy.setDLQ(true);
+ proxy.retryMessages();
+
+ try {
+ proxy.retryMessage(lastMessageId);
+ } catch (JMSException expected) {
+ assertTrue("Could not find", expected.getMessage().contains("find"));
+ }
+
+ long count = proxy.getQueueSize();
+ boolean cursorFull = proxy.getCursorPercentUsage() >= 70;
+ assertTrue("Cursor full", cursorFull);
+
+ assertEquals("Queue size", messageCount, count);
+ }
+
+ private String produceMessages() throws Exception {
+ connection = createConnectionFactory().createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String trackLastMessageId = "";
+ MessageProducer producer = session.createProducer(destination);
+ final byte[] payload = new byte[1024];
+ for (int i = 0; i < messageCount; i++) {
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(payload);
+ producer.send(message);
+ trackLastMessageId = message.getJMSMessageID();
+ }
+ producer.close();
+ connection.close();
+ return trackLastMessageId;
+ }
+
+
+ protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException {
+ ObjectName objectName = new ObjectName(name);
+ if (!mbeanServer.isRegistered(objectName)) {
+ fail("Could not find MBean!: " + objectName);
+ }
+ return objectName;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ createBroker();
+ mbeanServer = broker.getManagementContext().getMBeanServer();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
+ broker.stop();
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ broker = new BrokerService();
+ broker.setUseJmx(true);
+ broker.setEnableStatistics(true);
+ broker.addConnector("tcp://localhost:0");
+ ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
+
+ broker.deleteAllMessages();
+
+ PolicyEntry policyEntry = new PolicyEntry();
+ policyEntry.setMemoryLimit(1024*1024);
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.setDefaultEntry(policyEntry);
+ broker.setDestinationPolicy(policyMap);
+ broker.start();
+ lastMessageId = produceMessages();
+ return broker;
+ }
+
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
+ }
+
+}