You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2022/08/25 15:25:41 UTC

[activemq-artemis] branch main updated: ARTEMIS-3872 send scheduled msg immediately via mgmnt

This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 63966dcc56 ARTEMIS-3872 send scheduled msg immediately via mgmnt
     new 73e57e5ea8 This closes #4120
63966dcc56 is described below

commit 63966dcc561d8237d5206dfd4f9c9263166857f9
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Thu Aug 25 08:43:50 2022 -0500

    ARTEMIS-3872 send scheduled msg immediately via mgmnt
---
 .../apache/activemq/artemis/logs/AuditLogger.java  |  8 +++
 .../artemis/api/core/management/QueueControl.java  | 12 ++++
 .../core/management/impl/QueueControlImpl.java     | 27 +++++++
 .../apache/activemq/artemis/core/server/Queue.java | 10 +++
 .../core/server/ScheduledDeliveryHandler.java      |  4 +-
 .../artemis/core/server/impl/QueueImpl.java        | 18 ++++-
 .../server/impl/ScheduledDeliveryHandlerImpl.java  |  6 +-
 .../core/server/impl/RoutingContextTest.java       | 10 +++
 .../server/impl/ScheduledDeliveryHandlerTest.java  | 10 +++
 .../management/QueueControlUsingCoreTest.java      | 18 +++++
 .../scheduling/ScheduledMessageTest.java           | 84 ++++++++++++++++++++++
 .../tests/unit/core/postoffice/impl/FakeQueue.java | 11 +++
 12 files changed, 211 insertions(+), 7 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
index 481a5945be..5bcdad238a 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
@@ -2976,4 +2976,12 @@ public interface AuditLogger extends BasicLogger {
    @LogMessage(level = Logger.Level.INFO)
    @Message(id = 601763, value = "User {0} is remove a connector on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
    void removeConnector(String user, Object source, Object... args);
+
+   static void deliverScheduledMessage(Object source, Object... args) {
+      BASE_LOGGER.deliverScheduledMessage(getCaller(), source, arrayToString(args));
+   }
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 601764, value = "User {0} is calling deliverScheduledMessage on queue: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
+   void deliverScheduledMessage(String user, Object source, Object... args);
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index a3b132637e..98548438c1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -770,4 +770,16 @@ public interface QueueControl {
     */
    @Attribute(desc = "return how many messages are stuck in prepared transactions")
    int getPreparedTransactionMessageCount();
+
+   /**
+    * Deliver the scheduled messages which match the filter
+    */
+   @Operation(desc = "Immediately deliver the scheduled messages which match the filter", impact = MBeanOperationInfo.ACTION)
+   void deliverScheduledMessages(@Parameter(name = "filter", desc = "filter to match messages to deliver") String filter) throws Exception;
+
+   /**
+    * Deliver the scheduled message with the specified message ID
+    */
+   @Operation(desc = "Immediately deliver the scheduled message with the specified message ID", impact = MBeanOperationInfo.ACTION)
+   void deliverScheduledMessage(@Parameter(name = "messageID", desc = "ID of the message to deliver") long messageId) throws Exception;
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index b653f8597f..748e001d51 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -1962,8 +1962,35 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
       }
    }
 
+   @Override
+   public void deliverScheduledMessages(String filter) throws Exception {
+      if (AuditLogger.isBaseLoggingEnabled()) {
+         AuditLogger.deliverScheduledMessage(queue, filter);
+      }
+      checkStarted();
 
+      clearIO();
+      try {
+         queue.deliverScheduledMessages(filter);
+      } finally {
+         blockOnIO();
+      }
+   }
 
+   @Override
+   public void deliverScheduledMessage(long messageId) throws Exception {
+      if (AuditLogger.isBaseLoggingEnabled()) {
+         AuditLogger.deliverScheduledMessage(queue, messageId);
+      }
+      checkStarted();
+
+      clearIO();
+      try {
+         queue.deliverScheduledMessage(messageId);
+      } finally {
+         blockOnIO();
+      }
+   }
 
    private void checkStarted() {
       if (!server.getPostOffice().isStarted()) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 67e2b0b31c..cc99a18a7a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -504,6 +504,16 @@ public interface Queue extends Bindable,CriticalComponent {
     */
    void deliverScheduledMessages() throws ActiveMQException;
 
+   /**
+    * cancels scheduled messages which match the filter and send them to the head of the queue.
+    */
+   void deliverScheduledMessages(String filter) throws ActiveMQException;
+
+   /**
+    * cancels scheduled message with the corresponding message ID and sends it to the head of the queue.
+    */
+   void deliverScheduledMessage(long messageId) throws ActiveMQException;
+
    void postAcknowledge(MessageReference ref, AckReason reason);
 
    void postAcknowledge(MessageReference ref, AckReason reason, boolean delivering);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
index 898480e477..f419ae8d7c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
@@ -17,9 +17,9 @@
 package org.apache.activemq.artemis.core.server;
 
 import java.util.List;
+import java.util.function.Predicate;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 
 public interface ScheduledDeliveryHandler {
@@ -36,7 +36,7 @@ public interface ScheduledDeliveryHandler {
 
    List<MessageReference> getScheduledReferences();
 
-   List<MessageReference> cancel(Filter filter) throws ActiveMQException;
+   List<MessageReference> cancel(Predicate<MessageReference> predicate) throws ActiveMQException;
 
    MessageReference removeReferenceWithID(long id) throws Exception;
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 61f9a4655c..e64990642c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2035,7 +2035,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public void deliverScheduledMessages() throws ActiveMQException {
-      List<MessageReference> scheduledMessages = scheduledDeliveryHandler.cancel(null);
+      internalDeliverScheduleMessages(scheduledDeliveryHandler.cancel(ref -> true));
+   }
+
+   @Override
+   public void deliverScheduledMessages(String filterString) throws ActiveMQException {
+      final Filter filter = filterString == null || filterString.length() == 0 ? null : FilterImpl.createFilter(filterString);
+      internalDeliverScheduleMessages(scheduledDeliveryHandler.cancel(ref -> filter == null ? true : filter.match(ref.getMessage())));
+   }
+
+   @Override
+   public void deliverScheduledMessage(long messageId) throws ActiveMQException {
+      internalDeliverScheduleMessages(scheduledDeliveryHandler.cancel(ref -> ref.getMessageID() == messageId));
+   }
+
+   private void internalDeliverScheduleMessages(List<MessageReference> scheduledMessages) {
       if (scheduledMessages != null && scheduledMessages.size() > 0) {
          for (MessageReference ref : scheduledMessages) {
             ref.getMessage().setScheduledDeliveryTime(ref.getScheduledDeliveryTime());
@@ -2170,7 +2184,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                   txCount = 0;
                }
 
-               List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter1);
+               List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(ref -> filter1.match(ref.getMessage()));
                for (MessageReference messageReference : cancelled) {
                   messageAction.actMessage(tx, messageReference);
                   count++;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
index f663e60a8f..8d0b4d7f04 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
@@ -27,9 +27,9 @@ import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
@@ -117,7 +117,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
    }
 
    @Override
-   public List<MessageReference> cancel(final Filter filter) throws ActiveMQException {
+   public List<MessageReference> cancel(Predicate<MessageReference> predicate) throws ActiveMQException {
       List<MessageReference> refs = new ArrayList<>();
 
       synchronized (scheduledReferences) {
@@ -125,7 +125,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
 
          while (iter.hasNext()) {
             MessageReference ref = iter.next().getRef();
-            if (filter == null || filter.match(ref.getMessage())) {
+            if (predicate.test(ref)) {
                iter.remove();
                refs.add(ref);
                metrics.decrementMetrics(ref);
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
index ca540cceaa..438075cc83 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
@@ -895,6 +895,16 @@ public class RoutingContextTest {
 
       }
 
+      @Override
+      public void deliverScheduledMessages(String filter) throws ActiveMQException {
+
+      }
+
+      @Override
+      public void deliverScheduledMessage(long messageId) throws ActiveMQException {
+
+      }
+
       @Override
       public void postAcknowledge(MessageReference ref, AckReason reason) {
 
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index b0397b8dbd..f49b81d337 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1623,6 +1623,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
 
       }
 
+      @Override
+      public void deliverScheduledMessages(String filter) throws ActiveMQException {
+
+      }
+
+      @Override
+      public void deliverScheduledMessage(long messageId) throws ActiveMQException {
+
+      }
+
       @Override
       public void route(Message message, RoutingContext context) throws Exception {
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index be9b65cc08..a75a8e0764 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -58,6 +58,24 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
             }
          }
 
+         @Override
+         public void deliverScheduledMessages(String filter) throws Exception {
+            try {
+               proxy.invokeOperation("deliverScheduledMessages", filter);
+            } catch (Exception e) {
+               throw new RuntimeException(e.getMessage(), e);
+            }
+         }
+
+         @Override
+         public void deliverScheduledMessage(long messageId) throws Exception {
+            try {
+               proxy.invokeOperation("deliverScheduledMessage", messageId);
+            } catch (Exception e) {
+               throw new RuntimeException(e.getMessage(), e);
+            }
+         }
+
          @Override
          public void resetAllGroups() {
             try {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/scheduling/ScheduledMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/scheduling/ScheduledMessageTest.java
index 29b7eb60bf..956df30e93 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/scheduling/ScheduledMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/scheduling/ScheduledMessageTest.java
@@ -30,12 +30,15 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.junit.Assert;
 import org.junit.Before;
@@ -505,6 +508,87 @@ public class ScheduledMessageTest extends ActiveMQTestBase {
       session.close();
    }
 
+   @Test
+   public void testManagementDeliveryById() throws Exception {
+
+      ClientSessionFactory sessionFactory = createSessionFactory(locator);
+      ClientSession session = sessionFactory.createSession(false, false, false);
+      session.createQueue(new QueueConfiguration(atestq));
+      ClientProducer producer = session.createProducer(atestq);
+      long time = System.currentTimeMillis();
+      time += 999_999_999;
+
+      ClientMessage messageToSend = session.createMessage(true);
+      messageToSend.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
+      producer.send(messageToSend);
+
+      session.commit();
+
+      session.start();
+      ClientConsumer consumer = session.createConsumer(atestq);
+      ClientMessage message = consumer.receive(500);
+      assertNull(message);
+
+      QueueControl queueControl = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + atestq);
+      queueControl.deliverScheduledMessage((long) queueControl.listScheduledMessages()[0].get("messageID"));
+
+      message = consumer.receive(500);
+      assertNotNull(message);
+      message.acknowledge();
+
+      session.commit();
+
+      Assert.assertNull(consumer.receiveImmediate());
+
+      session.close();
+   }
+
+   @Test
+   public void testManagementDeliveryByFilter() throws Exception {
+      final String propertyValue = RandomUtil.randomString();
+      final String propertyName = "X" + RandomUtil.randomString().replace("-","");
+      ClientSessionFactory sessionFactory = createSessionFactory(locator);
+      ClientSession session = sessionFactory.createSession(false, false, false);
+      session.createQueue(new QueueConfiguration(atestq));
+      ClientProducer producer = session.createProducer(atestq);
+      long time = System.currentTimeMillis();
+      time += 999_999_999;
+
+      ClientMessage messageToSend = session.createMessage(true);
+      messageToSend.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
+      messageToSend.putStringProperty(propertyName, propertyValue);
+      producer.send(messageToSend);
+
+      messageToSend = session.createMessage(true);
+      messageToSend.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
+      messageToSend.putStringProperty(propertyName, propertyValue);
+      producer.send(messageToSend);
+
+      session.commit();
+
+      session.start();
+      ClientConsumer consumer = session.createConsumer(atestq);
+      ClientMessage message = consumer.receive(500);
+      assertNull(message);
+
+      QueueControl queueControl = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + atestq);
+      queueControl.deliverScheduledMessages(propertyName + " = '" + propertyValue + "'");
+
+      message = consumer.receive(500);
+      assertNotNull(message);
+      message.acknowledge();
+
+      message = consumer.receive(500);
+      assertNotNull(message);
+      message.acknowledge();
+
+      session.commit();
+
+      Assert.assertNull(consumer.receiveImmediate());
+
+      session.close();
+   }
+
    public void testScheduledAndNormalMessagesDeliveredCorrectly(final boolean recover) throws Exception {
 
       ClientSessionFactory sessionFactory = createSessionFactory(locator);
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 10299cb05f..dd910810ea 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -22,6 +22,7 @@ import java.util.Set;
 import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -678,6 +679,16 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
 
    }
 
+   @Override
+   public void deliverScheduledMessages(String filter) throws ActiveMQException {
+
+   }
+
+   @Override
+   public void deliverScheduledMessage(long messageId) throws ActiveMQException {
+
+   }
+
    @Override
    public SimpleString getName() {
       return name;