You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/07/15 17:52:06 UTC

[activemq-artemis] branch main updated: ARTEMIS-3386 Expiry messages using too many threads

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new b4aef3f  ARTEMIS-3386 Expiry messages using too many threads
b4aef3f is described below

commit b4aef3fca85167aa94b25bc02ccc899fc3bcde0e
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed Jul 14 17:21:28 2021 -0400

    ARTEMIS-3386 Expiry messages using too many threads
---
 .../org/apache/activemq/artemis/utils/Wait.java    |  2 +-
 .../core/postoffice/impl/PostOfficeImpl.java       | 11 +++-
 .../apache/activemq/artemis/core/server/Queue.java |  6 +-
 .../artemis/core/server/impl/QueueImpl.java        | 63 ++++++++++++++----
 .../server/plugin/ActiveMQServerQueuePlugin.java   | 14 ++++
 .../server/impl/ScheduledDeliveryHandlerTest.java  |  7 +-
 .../integration/client/ExpiryAddressTest.java      | 51 +++++++++++++++
 .../tests/integration/server/ExpiryRunnerTest.java | 75 ++++++++++++++++------
 .../tests/unit/core/postoffice/impl/FakeQueue.java |  6 +-
 9 files changed, 197 insertions(+), 38 deletions(-)

diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
index 50596e6..cd3c0c3 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
@@ -66,7 +66,7 @@ public class Wait {
    }
 
    public static void assertEquals(Long size, LongCondition condition, long timeout, long sleepMillis) throws Exception {
-      assertEquals(size, condition, timeout, sleepMillis, true);
+      assertEquals(size, condition, timeout, sleepMillis, false);
    }
 
    public static void assertEquals(Long size, LongCondition condition, long timeout, long sleepMillis, boolean printThreadDump) throws Exception {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index acb897c..7fbd3f8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -30,9 +30,11 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Stream;
 
 import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
@@ -1824,7 +1826,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          // This is to avoid leaks on PostOffice between stops and starts
          for (Queue queue : iterableOf(getLocalQueues())) {
             try {
-               queue.expireReferences();
+               CountDownLatch latch = new CountDownLatch(1);
+               queue.expireReferences(latch::countDown);
+               // the idea is in fact to block the Reaper while the Queue is executing reaping.
+               // This would avoid another eventual expiry to be called if the period for reaping is too small
+               // This should also avoid bursts in CPU consumption because of the expiry reaping
+               if (!latch.await(10, TimeUnit.SECONDS)) {
+                  ActiveMQServerLogger.LOGGER.errorExpiringMessages(new TimeoutException(queue.getName().toString()));
+               }
             } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorExpiringMessages(e);
             }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 193d0ed..af9a50e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -325,7 +325,11 @@ public interface Queue extends Bindable,CriticalComponent {
     */
    int expireReferences(Filter filter) throws Exception;
 
-   void expireReferences() throws Exception;
+   default void expireReferences() {
+      expireReferences((Runnable)null);
+   }
+
+   void expireReferences(Runnable done);
 
    void expire(MessageReference ref) throws Exception;
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index d97891d..37af7e4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2380,14 +2380,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public void expireReferences() {
+   public void expireReferences(Runnable done) {
       if (isExpirationRedundant()) {
+         if (done != null) {
+            done.run();
+         }
          return;
       }
 
+
       if (!queueDestroyed && expiryScanner.scannerRunning.get() == 0) {
-         expiryScanner.scannerRunning.incrementAndGet();
+         if (expiryScanner.scannerRunning.incrementAndGet() == 1) {
+            expiryScanner.doneCallback = done;
+         }
          getExecutor().execute(expiryScanner);
+      } else {
+         // expire is already happening on this queue, move on!
+         if (done != null) {
+            done.run();
+         }
       }
    }
 
@@ -2405,13 +2416,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    class ExpiryScanner implements Runnable {
 
+      public Runnable doneCallback;
       public AtomicInteger scannerRunning = new AtomicInteger(0);
+      LinkedListIterator<MessageReference> iter = null;
 
       @Override
       public void run() {
 
          boolean expired = false;
          boolean hasElements = false;
+         int elementsIterated = 0;
          int elementsExpired = 0;
 
          LinkedList<MessageReference> expiredMessages = new LinkedList<>();
@@ -2424,32 +2438,54 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                logger.debug("Scanning for expires on " + QueueImpl.this.getName());
             }
 
-            LinkedListIterator<MessageReference> iter = iterator();
+            if (iter == null) {
+               if (server.hasBrokerQueuePlugins()) {
+                  try {
+                     server.callBrokerQueuePlugins((p) -> p.beforeExpiryScan(QueueImpl.this));
+                  } catch (Exception e) {
+                     logger.warn(e.getMessage(), e);
+                  }
+               }
+               iter = iterator();
+            }
 
             try {
                while (postOffice.isStarted() && iter.hasNext()) {
                   hasElements = true;
                   MessageReference ref = iter.next();
                   if (ref.getMessage().isExpired()) {
+                     elementsExpired++;
                      incDelivering(ref);
                      expired = true;
                      expiredMessages.add(ref);
                      iter.remove();
-
-                     if (++elementsExpired >= MAX_DELIVERIES_IN_LOOP) {
-                        logger.debug("Breaking loop of expiring");
-                        scannerRunning.incrementAndGet();
-                        getExecutor().execute(this);
-                        break;
-                     }
+                  }
+                  if (++elementsIterated >= MAX_DELIVERIES_IN_LOOP) {
+                     logger.debug("Breaking loop of expiring");
+                     scannerRunning.incrementAndGet();
+                     getExecutor().execute(this);
+                     break;
                   }
                }
             } finally {
-               try {
+               if (scannerRunning.decrementAndGet() == 0) {
+                  if (server.hasBrokerQueuePlugins()) {
+                     try {
+                        server.callBrokerQueuePlugins((p) -> p.afterExpiryScan(QueueImpl.this));
+                     } catch (Exception e) {
+                        logger.warn(e.getMessage(), e);
+                     }
+                  }
+
                   iter.close();
-               } catch (Throwable ignored) {
+                  iter = null;
+
+                  if (doneCallback != null) {
+                     doneCallback.run();
+                     doneCallback = null;
+                  }
                }
-               scannerRunning.decrementAndGet();
+
                logger.debug("Scanning for expires on " + QueueImpl.this.getName() + " done");
 
             }
@@ -2473,7 +2509,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             }
             logger.debug("Expired " + elementsExpired + " references");
 
-
          }
 
          // If empty we need to schedule depaging to make sure we would depage expired messages as well
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerQueuePlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerQueuePlugin.java
index 0852bd1..2e69b50 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerQueuePlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerQueuePlugin.java
@@ -109,4 +109,18 @@ public interface ActiveMQServerQueuePlugin extends ActiveMQServerBasePlugin {
                                   boolean removeConsumers, boolean autoDeleteAddress) throws ActiveMQException {
 
    }
+
+   /**
+    * To be called before starting expiry scan on the queue
+    * @param queue
+    */
+   default void beforeExpiryScan(Queue queue) {
+   }
+
+   /**
+    * To be called before starting expiry scan on the queue
+    * @param queue
+    */
+   default void afterExpiryScan(Queue queue) {
+   }
 }
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 607670a..a951626 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -864,6 +864,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public void expireReferences(Runnable done) {
+
+      }
+
+      @Override
       public void refDown(MessageReference messageReference) {
 
       }
@@ -1401,7 +1406,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public void expireReferences() throws Exception {
+      public void expireReferences() {
 
       }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ExpiryAddressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ExpiryAddressTest.java
index cffc2ac..17e1ac3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ExpiryAddressTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ExpiryAddressTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.artemis.tests.integration.client;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -27,6 +30,8 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.RandomUtil;
@@ -74,6 +79,52 @@ public class ExpiryAddressTest extends ActiveMQTestBase {
       m.acknowledge();
    }
 
+
+   @Test
+   public void testExpireSingleMessage() throws Exception {
+      SimpleString ea = new SimpleString("EA");
+      SimpleString adSend = new SimpleString("a1");
+      SimpleString qName = new SimpleString("q1");
+      SimpleString eq = new SimpleString("EA1");
+      AddressSettings addressSettings = new AddressSettings().setExpiryAddress(ea);
+      server.getAddressSettingsRepository().addMatch("#", addressSettings);
+      clientSession.createQueue(new QueueConfiguration(eq).setAddress(ea).setDurable(false));
+      clientSession.createQueue(new QueueConfiguration(qName).setAddress(adSend).setDurable(false));
+
+
+      ClientProducer producer = clientSession.createProducer(adSend);
+
+      for (int i = 0; i < QueueImpl.MAX_DELIVERIES_IN_LOOP * 2 + 100; i++) {
+         ClientMessage clientMessage = createTextMessage(clientSession, "notExpired!");
+         clientMessage.putIntProperty("i", i);
+         producer.send(clientMessage);
+      }
+
+      ClientMessage clientMessage = createTextMessage(clientSession, "heyho!");
+      clientMessage.setExpiration(System.currentTimeMillis());
+      producer.send(clientMessage);
+
+      Queue queueQ1 = server.locateQueue("q1");
+
+      CountDownLatch latch = new CountDownLatch(10);
+      for (int i = 0; i < 10; i++) {
+         // done should be called even for the ones that are ignored because the expiry is already running
+         queueQ1.expireReferences(latch::countDown);
+      }
+
+      Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+      clientSession.start();
+      ClientConsumer clientConsumer = clientSession.createConsumer(eq);
+      ClientMessage m = clientConsumer.receive(5000);
+      Assert.assertNotNull(m);
+      Assert.assertEquals(qName.toString(), m.getStringProperty(Message.HDR_ORIGINAL_QUEUE));
+      Assert.assertEquals(adSend.toString(), m.getStringProperty(Message.HDR_ORIGINAL_ADDRESS));
+      Assert.assertNotNull(m);
+      Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
+      m.acknowledge();
+   }
+
    @Test
    public void testBasicSendWithRetroActiveAddressSettings() throws Exception {
       // apply "original" address settings
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpiryRunnerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpiryRunnerTest.java
index a07108e..fa56e22 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpiryRunnerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ExpiryRunnerTest.java
@@ -20,8 +20,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -33,8 +35,11 @@ import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -52,6 +57,9 @@ public class ExpiryRunnerTest extends ActiveMQTestBase {
    private SimpleString expiryQueue;
 
    private SimpleString expiryAddress;
+
+   Queue artemisExpiryQueue;
+
    private ServerLocator locator;
 
    @Test
@@ -212,30 +220,57 @@ public class ExpiryRunnerTest extends ActiveMQTestBase {
       thr.join();
    }
 
-   //
-   //   public static void main(final String[] args) throws Exception
-   //   {
-   //      for (int i = 0; i < 1000; i++)
-   //      {
-   //         TestSuite suite = new TestSuite();
-   //         ExpiryRunnerTest expiryRunnerTest = new ExpiryRunnerTest();
-   //         expiryRunnerTest.setName("testExpireWhilstConsuming");
-   //         suite.addTest(expiryRunnerTest);
-   //
-   //         TestResult result = TestRunner.run(suite);
-   //         if (result.errorCount() > 0 || result.failureCount() > 0)
-   //         {
-   //            System.exit(1);
-   //         }
-   //      }
-   //   }
+
+   @Test
+   public void testManyQueuesExpire() throws Exception {
+
+      AtomicInteger currentExpiryHappening = new AtomicInteger();
+      AtomicInteger maxExpiryHappening = new AtomicInteger(0);
+
+      server.registerBrokerPlugin(new ActiveMQServerQueuePlugin() {
+         @Override
+         public void beforeExpiryScan(Queue queue) {
+            currentExpiryHappening.incrementAndGet();
+            while (!maxExpiryHappening.compareAndSet(maxExpiryHappening.get(), Math.max(maxExpiryHappening.get(), currentExpiryHappening.get()))) {
+               Thread.yield();
+            }
+         }
+
+         @Override
+         public void afterExpiryScan(Queue queue) {
+            currentExpiryHappening.decrementAndGet();
+         }
+      });
+
+      Assert.assertTrue(server.hasBrokerQueuePlugins());
+
+      server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setExpiryAddress(expiryAddress));
+      for (int ad = 0; ad < 1000; ad++) {
+         server.addAddressInfo(new AddressInfo("test" + ad));
+         server.createQueue(new QueueConfiguration("test" + ad).setAddress("test" + ad).setRoutingType(RoutingType.ANYCAST));
+      }
+
+      ClientProducer producer = clientSession.createProducer();
+
+      for (int i = 0; i < 1000; i++) {
+         ClientMessage message = clientSession.createMessage(true);
+         message.setExpiration(System.currentTimeMillis());
+         producer.send("test" + i, message);
+      }
+
+      Wait.assertEquals(1000, artemisExpiryQueue::getMessageCount);
+
+      // The system should not burst itself looking for expiration, that would use too many resources from the broker itself
+      Assert.assertTrue("The System had " + maxExpiryHappening + " threads in parallel scanning for expiration", maxExpiryHappening.get() == 1);
+
+   }
 
    @Override
    @Before
    public void setUp() throws Exception {
       super.setUp();
 
-      ConfigurationImpl configuration = (ConfigurationImpl) createDefaultInVMConfig().setMessageExpiryScanPeriod(1000);
+      ConfigurationImpl configuration = (ConfigurationImpl) createDefaultInVMConfig().setMessageExpiryScanPeriod(100);
       server = addServer(ActiveMQServers.newActiveMQServer(configuration, false));
       // start the server
       server.start();
@@ -251,7 +286,9 @@ public class ExpiryRunnerTest extends ActiveMQTestBase {
       AddressSettings addressSettings = new AddressSettings().setExpiryAddress(expiryAddress);
       server.getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
       server.getAddressSettingsRepository().addMatch(qName2.toString(), addressSettings);
-      clientSession.createQueue(new QueueConfiguration(expiryQueue).setAddress(expiryAddress).setDurable(false));
+      clientSession.createQueue(new QueueConfiguration(expiryQueue).setAddress(expiryAddress).setDurable(true));
+      artemisExpiryQueue = server.locateQueue(expiryQueue);
+      Assert.assertNotNull(artemisExpiryQueue);
    }
 
    private static class DummyMessageHandler implements Runnable {
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 5c182f6..dca155a 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -62,6 +62,10 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public void expireReferences(Runnable done) {
+   }
+
+   @Override
    public PagingStore getPagingStore() {
       return null;
    }
@@ -503,7 +507,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public void expireReferences() throws Exception {
+   public void expireReferences() {
       // no-op
 
    }