You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/10/21 17:21:32 UTC

[activemq-artemis] 01/03: ARTEMIS-3523: Created delegated methods replay in addressControl

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit ebf8adc72b261e7e2375f3aa96354af8a0783abd
Author: nbrendah <br...@gmail.com>
AuthorDate: Wed Oct 20 11:00:39 2021 +0300

    ARTEMIS-3523: Created delegated methods replay in addressControl
---
 .../api/core/management/AddressControl.java        |   9 ++
 .../core/management/impl/AddressControlImpl.java   |  19 ++++
 .../integration/management/AddressControlTest.java | 116 +++++++++++++++++++++
 .../management/AddressControlUsingCoreTest.java    |  10 ++
 4 files changed, 154 insertions(+)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
index 539988b..beef4c3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
@@ -230,4 +230,13 @@ public interface AddressControl {
    @Operation(desc = "Purges the queues bound to this address. Returns the total number of messages purged.", impact = MBeanOperationInfo.ACTION)
    long purge() throws Exception;
 
+   @Operation(desc = "Makes the broker to read messages from the retention folder matching the address and filter.", impact = MBeanOperationInfo.ACTION)
+   void replay(@Parameter(name = "target", desc = "Where the replay data should be sent") String target,
+               @Parameter(name = "filter", desc = "Filter to apply on message selection. Null means everything matching the address") String filter) throws Exception;
+
+   @Operation(desc = "Makes the broker to read messages from the retention folder matching the address and filter.", impact = MBeanOperationInfo.ACTION)
+   void replay(@Parameter(name = "startScanDate", desc = "Start date where we will start scanning for journals to replay. Format YYYYMMDDHHMMSS") String startScan,
+               @Parameter(name = "endScanDate", desc = "Finish date where we will stop scannning for journals to replay. Format YYYYMMDDHHMMSS") String endScan,
+               @Parameter(name = "target", desc = "Where the replay data should be sent") String target,
+               @Parameter(name = "filter", desc = "Filter to apply on message selection. Null means everything matching the address") String filter) throws Exception;
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index e384697..6277db9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -19,8 +19,10 @@ package org.apache.activemq.artemis.core.management.impl;
 import javax.json.JsonArrayBuilder;
 import javax.management.MBeanAttributeInfo;
 import javax.management.MBeanOperationInfo;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -48,6 +50,7 @@ import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
+import org.apache.activemq.artemis.core.server.replay.ReplayManager;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.logs.AuditLogger;
 import org.apache.activemq.artemis.utils.JsonLoader;
@@ -574,6 +577,22 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
       return totalMsgs;
    }
 
+   @Override
+   public void replay(String target, String filter) throws Exception {
+      server.replay(null, null, this.getAddress(), target, filter);
+   }
+
+   @Override
+   public void replay(String startScan, String endScan, String target, String filter) throws Exception {
+
+      SimpleDateFormat format = ReplayManager.newRetentionSimpleDateFormat();
+
+      Date startScanDate = format.parse(startScan);
+      Date endScanDate = format.parse(endScan);
+
+      server.replay(startScanDate, endScanDate, this.getAddress(), target, filter);
+   }
+
    // Private -------------------------------------------------------
 
    private long getMessageCount(final DurabilityType durability) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
index 2095272..a303ad6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
@@ -16,10 +16,18 @@
  */
 package org.apache.activemq.artemis.tests.integration.management;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.json.JsonArray;
 import javax.json.JsonString;
+import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.EnumSet;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -47,8 +55,10 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.RandomUtil;
@@ -580,6 +590,111 @@ public class AddressControlTest extends ManagementTestBase {
       Wait.assertEquals(0L, () -> addressControl.getMessageCount(), 2000, 100);
    }
 
+   @Test
+   public void testReplayWithoutDate() throws Exception {
+      testReplaySimple(false);
+   }
+
+   @Test
+   public void testReplayWithDate() throws Exception {
+      testReplaySimple(true);
+   }
+
+   private void testReplaySimple(boolean useDate) throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+
+      AddressControl addressControl = createManagementControl(address);
+      String queue = "testQueue" + RandomUtil.randomString();
+      server.addAddressInfo(new AddressInfo(queue).addRoutingType(RoutingType.ANYCAST));
+      server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST).setAddress(queue));
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue jmsQueue = session.createQueue(queue);
+         MessageProducer producer = session.createProducer(jmsQueue);
+         producer.send(session.createTextMessage("before"));
+
+         connection.start();
+         MessageConsumer consumer = session.createConsumer(jmsQueue);
+         Assert.assertNotNull(consumer.receive(5000));
+         Assert.assertNull(consumer.receiveNoWait());
+
+         addressControl.replay(queue, null);
+         Assert.assertNotNull(consumer.receive(5000));
+         Assert.assertNull(consumer.receiveNoWait());
+
+         if (useDate) {
+            addressControl.replay("dontexist", null); // just to force a move next file, and copy stuff into place
+            SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
+            Thread.sleep(1000); // waiting a second just to have the timestamp change
+            String dateEnd = format.format(new Date());
+            Thread.sleep(1000); // waiting a second just to have the timestamp change
+            String dateStart = "19800101000000";
+
+
+            for (int i = 0; i < 100; i++) {
+               producer.send(session.createTextMessage("after receiving"));
+            }
+            for (int i = 0; i < 100; i++) {
+               Assert.assertNotNull(consumer.receive());
+            }
+            Assert.assertNull(consumer.receiveNoWait());
+            addressControl.replay(dateStart, dateEnd, queue, null);
+            for (int i = 0; i < 2; i++) { // replay of the replay will contain two messages
+               TextMessage message = (TextMessage) consumer.receive(5000);
+               Assert.assertNotNull(message);
+               Assert.assertEquals("before", message.getText());
+            }
+            Assert.assertNull(consumer.receiveNoWait());
+         } else {
+            addressControl.replay(queue, null);
+
+            // replay of the replay, there will be two messages
+            for (int i = 0; i < 2; i++) {
+               Assert.assertNotNull(consumer.receive(5000));
+            }
+            Assert.assertNull(consumer.receiveNoWait());
+         }
+      }
+   }
+
+   @Test
+   public void testReplayFilter() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+
+      AddressControl addressControl = createManagementControl(address);
+      String queue = "testQueue" + RandomUtil.randomString();
+      server.addAddressInfo(new AddressInfo(queue).addRoutingType(RoutingType.ANYCAST));
+      server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST).setAddress(queue));
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue jmsQueue = session.createQueue(queue);
+         MessageProducer producer = session.createProducer(jmsQueue);
+         for (int i = 0; i < 10; i++) {
+            TextMessage message = session.createTextMessage("message " + i);
+            message.setIntProperty("i", i);
+            producer.send(message);
+         }
+
+         connection.start();
+         MessageConsumer consumer = session.createConsumer(jmsQueue);
+         for (int i = 0; i < 10; i++) {
+            Assert.assertNotNull(consumer.receive(5000));
+         }
+         Assert.assertNull(consumer.receiveNoWait());
+
+         addressControl.replay(queue, "i=5");
+         TextMessage message = (TextMessage)consumer.receive(5000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals(5, message.getIntProperty("i"));
+         Assert.assertEquals("message 5", message.getText());
+         Assert.assertNull(consumer.receiveNoWait());
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -607,6 +722,7 @@ public class AddressControlTest extends ManagementTestBase {
 
    // Private -------------------------------------------------------
 
+
    // Inner classes -------------------------------------------------
 
 }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
index c512a44..738b47d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
@@ -179,6 +179,16 @@ public class AddressControlUsingCoreTest extends AddressControlTest {
          }
 
          @Override
+         public void replay(String startScan, String endScan, String target, String filter) throws Exception {
+            proxy.invokeOperation("replay", startScan, endScan, target, filter);
+         }
+
+         @Override
+         public void replay(String target, String filter) throws Exception {
+            proxy.invokeOperation("replay", target, filter);
+         }
+
+         @Override
          public String sendMessage(Map<String, String> headers,
                                    int type,
                                    String body,