You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/07/15 17:52:06 UTC
[activemq-artemis] branch main updated: ARTEMIS-3386 Expiry
messages using too many threads
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new b4aef3f ARTEMIS-3386 Expiry messages using too many threads
b4aef3f is described below
commit b4aef3fca85167aa94b25bc02ccc899fc3bcde0e
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed Jul 14 17:21:28 2021 -0400
ARTEMIS-3386 Expiry messages using too many threads
---
.../org/apache/activemq/artemis/utils/Wait.java | 2 +-
.../core/postoffice/impl/PostOfficeImpl.java | 11 +++-
.../apache/activemq/artemis/core/server/Queue.java | 6 +-
.../artemis/core/server/impl/QueueImpl.java | 63 ++++++++++++++----
.../server/plugin/ActiveMQServerQueuePlugin.java | 14 ++++
.../server/impl/ScheduledDeliveryHandlerTest.java | 7 +-
.../integration/client/ExpiryAddressTest.java | 51 +++++++++++++++
.../tests/integration/server/ExpiryRunnerTest.java | 75 ++++++++++++++++------
.../tests/unit/core/postoffice/impl/FakeQueue.java | 6 +-
9 files changed, 197 insertions(+), 38 deletions(-)
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
index 50596e6..cd3c0c3 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
@@ -66,7 +66,7 @@ public class Wait {
}
public static void assertEquals(Long size, LongCondition condition, long timeout, long sleepMillis) throws Exception {
- assertEquals(size, condition, timeout, sleepMillis, true);
+ assertEquals(size, condition, timeout, sleepMillis, false);
}
public static void assertEquals(Long size, LongCondition condition, long timeout, long sleepMillis, boolean printThreadDump) throws Exception {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index acb897c..7fbd3f8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -30,9 +30,11 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
@@ -1824,7 +1826,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
// This is to avoid leaks on PostOffice between stops and starts
for (Queue queue : iterableOf(getLocalQueues())) {
try {
- queue.expireReferences();
+ CountDownLatch latch = new CountDownLatch(1);
+ queue.expireReferences(latch::countDown);
+ // the idea is in fact to block the Reaper while the Queue is executing reaping.
+ // This would avoid another eventual expiry to be called if the period for reaping is too small
+ // This should also avoid bursts in CPU consumption because of the expiry reaping
+ if (!latch.await(10, TimeUnit.SECONDS)) {
+ ActiveMQServerLogger.LOGGER.errorExpiringMessages(new TimeoutException(queue.getName().toString()));
+ }
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorExpiringMessages(e);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 193d0ed..af9a50e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -325,7 +325,11 @@ public interface Queue extends Bindable,CriticalComponent {
*/
int expireReferences(Filter filter) throws Exception;
- void expireReferences() throws Exception;
+ default void expireReferences() {
+ expireReferences((Runnable)null);
+ }
+
+ void expireReferences(Runnable done);
void expire(MessageReference ref) throws Exception;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index d97891d..37af7e4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2380,14 +2380,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
- public void expireReferences() {
+ public void expireReferences(Runnable done) {
if (isExpirationRedundant()) {
+ if (done != null) {
+ done.run();
+ }
return;
}
+
if (!queueDestroyed && expiryScanner.scannerRunning.get() == 0) {
- expiryScanner.scannerRunning.incrementAndGet();
+ if (expiryScanner.scannerRunning.incrementAndGet() == 1) {
+ expiryScanner.doneCallback = done;
+ }
getExecutor().execute(expiryScanner);
+ } else {
+ // expire is already happening on this queue, move on!
+ if (done != null) {
+ done.run();
+ }
}
}
@@ -2405,13 +2416,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
class ExpiryScanner implements Runnable {
+ public Runnable doneCallback;
public AtomicInteger scannerRunning = new AtomicInteger(0);
+ LinkedListIterator<MessageReference> iter = null;
@Override
public void run() {
boolean expired = false;
boolean hasElements = false;
+ int elementsIterated = 0;
int elementsExpired = 0;
LinkedList<MessageReference> expiredMessages = new LinkedList<>();
@@ -2424,32 +2438,54 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
logger.debug("Scanning for expires on " + QueueImpl.this.getName());
}
- LinkedListIterator<MessageReference> iter = iterator();
+ if (iter == null) {
+ if (server.hasBrokerQueuePlugins()) {
+ try {
+ server.callBrokerQueuePlugins((p) -> p.beforeExpiryScan(QueueImpl.this));
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ iter = iterator();
+ }
try {
while (postOffice.isStarted() && iter.hasNext()) {
hasElements = true;
MessageReference ref = iter.next();
if (ref.getMessage().isExpired()) {
+ elementsExpired++;
incDelivering(ref);
expired = true;
expiredMessages.add(ref);
iter.remove();
-
- if (++elementsExpired >= MAX_DELIVERIES_IN_LOOP) {
- logger.debug("Breaking loop of expiring");
- scannerRunning.incrementAndGet();
- getExecutor().execute(this);
- break;
- }
+ }
+ if (++elementsIterated >= MAX_DELIVERIES_IN_LOOP) {
+ logger.debug("Breaking loop of expiring");
+ scannerRunning.incrementAndGet();
+ getExecutor().execute(this);
+ break;
}
}
} finally {
- try {
+ if (scannerRunning.decrementAndGet() == 0) {
+ if (server.hasBrokerQueuePlugins()) {
+ try {
+ server.callBrokerQueuePlugins((p) -> p.afterExpiryScan(QueueImpl.this));
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
iter.close();
- } catch (Throwable ignored) {
+ iter = null;
+
+ if (doneCallback != null) {
+ doneCallback.run();
+ doneCallback = null;
+ }
}
- scannerRunning.decrementAndGet();
+
logger.debug("Scanning for expires on " + QueueImpl.this.getName() + " done");
}
@@ -2473,7 +2509,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
logger.debug("Expired " + elementsExpired + " references");
-
}
// If empty we need to schedule depaging to make sure we would depage expired messages as well
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerQueuePlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerQueuePlugin.java
index 0852bd1..2e69b50 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerQueuePlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerQueuePlugin.java
@@ -109,4 +109,18 @@ public interface ActiveMQServerQueuePlugin extends ActiveMQServerBasePlugin {
boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException {
}
+
+ /**
+ * To be called before starting expiry scan on the queue
+ * @param queue
+ */
+ default void beforeExpiryScan(Queue queue) {
+ }
+
+ /**
+ * To be called before starting expiry scan on the queue
+ * @param queue
+ */
+ default void afterExpiryScan(Queue queue) {
+ }
}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 607670a..a951626 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -864,6 +864,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public void expireReferences(Runnable done) {
+
+ }
+
+ @Override
public void refDown(MessageReference messageReference) {
}
@@ -1401,7 +1406,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public void expireReferences() throws Exception {
+ public void expireReferences() {
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ExpiryAddressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ExpiryAddressTest.java
index cffc2ac..17e1ac3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ExpiryAddressTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ExpiryAddressTest.java
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.tests.integration.client;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -27,6 +30,8 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
@@ -74,6 +79,52 @@ public class ExpiryAddressTest extends ActiveMQTestBase {
m.acknowledge();
}
+
+ @Test
+ public void testExpireSingleMessage() throws Exception {
+ SimpleString ea = new SimpleString("EA");
+ SimpleString adSend = new SimpleString("a1");
+ SimpleString qName = new SimpleString("q1");
+ SimpleString eq = new SimpleString("EA1");
+ AddressSettings addressSettings = new AddressSettings().setExpiryAddress(ea);
+ server.getAddressSettingsRepository().addMatch("#", addressSettings);
+ clientSession.createQueue(new QueueConfiguration(eq).setAddress(ea).setDurable(false));
+ clientSession.createQueue(new QueueConfiguration(qName).setAddress(adSend).setDurable(false));
+
+
+ ClientProducer producer = clientSession.createProducer(adSend);
+
+ for (int i = 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2 + 100; i++) {
+ ClientMessage clientMessage = createTextMessage(clientSession, "notExpired!");
+ clientMessage.putIntProperty("i", i);
+ producer.send(clientMessage);
+ }
+
+ ClientMessage clientMessage = createTextMessage(clientSession, "heyho!");
+ clientMessage.setExpiration(System.currentTimeMillis());
+ producer.send(clientMessage);
+
+ Queue queueQ1 = server.locateQueue("q1");
+
+ CountDownLatch latch = new CountDownLatch(10);
+ for (int i = 0; i < 10; i++) {
+ // done should be called even for the ones that are ignored because the expiry is already running
+ queueQ1.expireReferences(latch::countDown);
+ }
+
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ clientSession.start();
+ ClientConsumer clientConsumer = clientSession.createConsumer(eq);
+ ClientMessage m = clientConsumer.receive(5000);
+ Assert.assertNotNull(m);
+ Assert.assertEquals(qName.toString(), m.getStringProperty(Message.HDR_ORIGINAL_QUEUE));
+ Assert.assertEquals(adSend.toString(), m.getStringProperty(Message.HDR_ORIGINAL_ADDRESS));
+ Assert.assertNotNull(m);
+ Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
+ m.acknowledge();
+ }
+
@Test
public void testBasicSendWithRetroActiveAddressSettings() throws Exception {
// apply "original" address settings
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpiryRunnerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpiryRunnerTest.java
index a07108e..fa56e22 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpiryRunnerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpiryRunnerTest.java
@@ -20,8 +20,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -33,8 +35,11 @@ import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -52,6 +57,9 @@ public class ExpiryRunnerTest extends ActiveMQTestBase {
private SimpleString expiryQueue;
private SimpleString expiryAddress;
+
+ Queue artemisExpiryQueue;
+
private ServerLocator locator;
@Test
@@ -212,30 +220,57 @@ public class ExpiryRunnerTest extends ActiveMQTestBase {
thr.join();
}
- //
- // public static void main(final String[] args) throws Exception
- // {
- // for (int i = 0; i < 1000; i++)
- // {
- // TestSuite suite = new TestSuite();
- // ExpiryRunnerTest expiryRunnerTest = new ExpiryRunnerTest();
- // expiryRunnerTest.setName("testExpireWhilstConsuming");
- // suite.addTest(expiryRunnerTest);
- //
- // TestResult result = TestRunner.run(suite);
- // if (result.errorCount() > 0 || result.failureCount() > 0)
- // {
- // System.exit(1);
- // }
- // }
- // }
+
+ @Test
+ public void testManyQueuesExpire() throws Exception {
+
+ AtomicInteger currentExpiryHappening = new AtomicInteger();
+ AtomicInteger maxExpiryHappening = new AtomicInteger(0);
+
+ server.registerBrokerPlugin(new ActiveMQServerQueuePlugin() {
+ @Override
+ public void beforeExpiryScan(Queue queue) {
+ currentExpiryHappening.incrementAndGet();
+ while (!maxExpiryHappening.compareAndSet(maxExpiryHappening.get(), Math.max(maxExpiryHappening.get(), currentExpiryHappening.get()))) {
+ Thread.yield();
+ }
+ }
+
+ @Override
+ public void afterExpiryScan(Queue queue) {
+ currentExpiryHappening.decrementAndGet();
+ }
+ });
+
+ Assert.assertTrue(server.hasBrokerQueuePlugins());
+
+ server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setExpiryAddress(expiryAddress));
+ for (int ad = 0; ad < 1000; ad++) {
+ server.addAddressInfo(new AddressInfo("test" + ad));
+ server.createQueue(new QueueConfiguration("test" + ad).setAddress("test" + ad).setRoutingType(RoutingType.ANYCAST));
+ }
+
+ ClientProducer producer = clientSession.createProducer();
+
+ for (int i = 0; i < 1000; i++) {
+ ClientMessage message = clientSession.createMessage(true);
+ message.setExpiration(System.currentTimeMillis());
+ producer.send("test" + i, message);
+ }
+
+ Wait.assertEquals(1000, artemisExpiryQueue::getMessageCount);
+
+ // The system should not burst itself looking for expiration, that would use too many resources from the broker itself
+ Assert.assertTrue("The System had " + maxExpiryHappening + " threads in parallel scanning for expiration", maxExpiryHappening.get() == 1);
+
+ }
@Override
@Before
public void setUp() throws Exception {
super.setUp();
- ConfigurationImpl configuration = (ConfigurationImpl) createDefaultInVMConfig().setMessageExpiryScanPeriod(1000);
+ ConfigurationImpl configuration = (ConfigurationImpl) createDefaultInVMConfig().setMessageExpiryScanPeriod(100);
server = addServer(ActiveMQServers.newActiveMQServer(configuration, false));
// start the server
server.start();
@@ -251,7 +286,9 @@ public class ExpiryRunnerTest extends ActiveMQTestBase {
AddressSettings addressSettings = new AddressSettings().setExpiryAddress(expiryAddress);
server.getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
server.getAddressSettingsRepository().addMatch(qName2.toString(), addressSettings);
- clientSession.createQueue(new QueueConfiguration(expiryQueue).setAddress(expiryAddress).setDurable(false));
+ clientSession.createQueue(new QueueConfiguration(expiryQueue).setAddress(expiryAddress).setDurable(true));
+ artemisExpiryQueue = server.locateQueue(expiryQueue);
+ Assert.assertNotNull(artemisExpiryQueue);
}
private static class DummyMessageHandler implements Runnable {
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 5c182f6..dca155a 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -62,6 +62,10 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
+ public void expireReferences(Runnable done) {
+ }
+
+ @Override
public PagingStore getPagingStore() {
return null;
}
@@ -503,7 +507,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
- public void expireReferences() throws Exception {
+ public void expireReferences() {
// no-op
}