You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/10/09 15:41:10 UTC
[activemq-artemis] branch master updated: ARTEMIS-2508 Crititical
analyser trigger shutdown if removeAllMessages
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 28d1a53 ARTEMIS-2508 Crititical analyser trigger shutdown if removeAllMessages
new 68f419d This closes #2855
28d1a53 is described below
commit 28d1a53630bea2ce9ef137cda4724fbf106dd96e
Author: brusdev <br...@gmail.com>
AuthorDate: Tue Oct 8 06:26:16 2019 +0200
ARTEMIS-2508 Crititical analyser trigger shutdown if removeAllMessages
The crititical analyser trigger the broker shutdown if try to
removeAllMessages with a huge queue. The iterQueue is split so as
not to keep the lock too time.
---
.../artemis/core/server/impl/QueueImpl.java | 70 +++++++++---------
.../tests/integration/paging/PagingTest.java | 86 ++++++++++++++++++++++
2 files changed, 122 insertions(+), 34 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index f48e430..881d398 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1915,7 +1915,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
- public synchronized int deleteMatchingReferences(final int flushLimit, final Filter filter1, AckReason ackReason) throws Exception {
+ public int deleteMatchingReferences(final int flushLimit, final Filter filter1, AckReason ackReason) throws Exception {
return iterQueue(flushLimit, filter1, createDeleteMatchingAction(ackReason));
}
@@ -1947,7 +1947,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
* @return
* @throws Exception
*/
- private synchronized int iterQueue(final int flushLimit,
+ private int iterQueue(final int flushLimit,
final Filter filter1,
QueueIterateAction messageAction) throws Exception {
int count = 0;
@@ -1955,45 +1955,47 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
Transaction tx = new TransactionImpl(storageManager);
- try (LinkedListIterator<MessageReference> iter = iterator()) {
+ synchronized (this) {
+ try (LinkedListIterator<MessageReference> iter = iterator()) {
- while (iter.hasNext()) {
- MessageReference ref = iter.next();
+ while (iter.hasNext()) {
+ MessageReference ref = iter.next();
- if (ref.isPaged() && queueDestroyed) {
- // this means the queue is being removed
- // hence paged references are just going away through
- // page cleanup
- continue;
- }
+ if (ref.isPaged() && queueDestroyed) {
+ // this means the queue is being removed
+ // hence paged references are just going away through
+ // page cleanup
+ continue;
+ }
- if (filter1 == null || filter1.match(ref.getMessage())) {
- messageAction.actMessage(tx, ref);
- iter.remove();
- txCount++;
- count++;
+ if (filter1 == null || filter1.match(ref.getMessage())) {
+ messageAction.actMessage(tx, ref);
+ iter.remove();
+ txCount++;
+ count++;
+ }
}
- }
- if (txCount > 0) {
- tx.commit();
+ if (txCount > 0) {
+ tx.commit();
- tx = new TransactionImpl(storageManager);
+ tx = new TransactionImpl(storageManager);
- txCount = 0;
- }
+ txCount = 0;
+ }
- List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter1);
- for (MessageReference messageReference : cancelled) {
- messageAction.actMessage(tx, messageReference, false);
- count++;
- txCount++;
- }
+ List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter1);
+ for (MessageReference messageReference : cancelled) {
+ messageAction.actMessage(tx, messageReference, false);
+ count++;
+ txCount++;
+ }
- if (txCount > 0) {
- tx.commit();
- tx = new TransactionImpl(storageManager);
- txCount = 0;
+ if (txCount > 0) {
+ tx.commit();
+ tx = new TransactionImpl(storageManager);
+ txCount = 0;
+ }
}
if (pageIterator != null && !queueDestroyed) {
@@ -2350,7 +2352,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
- public synchronized int moveReferences(final int flushLimit,
+ public int moveReferences(final int flushLimit,
final Filter filter,
final SimpleString toAddress,
final boolean rejectDuplicates,
@@ -2384,7 +2386,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
});
}
- public synchronized int moveReferencesBetweenSnFQueues(final SimpleString queueSuffix) throws Exception {
+ public int moveReferencesBetweenSnFQueues(final SimpleString queueSuffix) throws Exception {
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
@Override
public void actMessage(Transaction tx, MessageReference ref) throws Exception {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 19eee08..8e816d2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -57,6 +57,8 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
@@ -493,6 +495,90 @@ public class PagingTest extends ActiveMQTestBase {
}
@Test
+ public void testQueueRemoveAll() throws Exception {
+ clearDataRecreateServerDirs();
+
+ Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+ server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
+
+ server.start();
+
+ final int numberOfMessages = 5000;
+
+ locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+ sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[MESSAGE_SIZE];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= MESSAGE_SIZE; j++) {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ message = session.createMessage(true);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ producer.send(message);
+ if (i % 1000 == 0) {
+ session.commit();
+ }
+ }
+ session.commit();
+ producer.close();
+ session.close();
+
+ session = sf.createSession(false, false, false);
+ producer = session.createProducer(PagingTest.ADDRESS);
+ producer.send(session.createMessage(true));
+ session.rollback();
+ producer.close();
+ session.close();
+
+ session = sf.createSession(false, false, false);
+ producer = session.createProducer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ message = session.createMessage(true);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ producer.send(message);
+ if (i % 1000 == 0) {
+ session.commit();
+ }
+ }
+ session.commit();
+ producer.close();
+ session.close();
+
+ Queue queue = server.locateQueue(PagingTest.ADDRESS);
+
+ Wait.assertEquals(numberOfMessages * 2, queue::getMessageCount);
+
+ QueueControl queueControl = (QueueControl) this.server.getManagementService().getResource(ResourceNames.QUEUE + PagingSendTest.ADDRESS);
+ int removedMessages = queueControl.removeAllMessages();
+
+ Assert.assertEquals(numberOfMessages * 2, removedMessages);
+ }
+
+ @Test
public void testEmptyAddress() throws Exception {
if (storeType == StoreConfiguration.StoreType.FILE) {
clearDataRecreateServerDirs();