You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/10/21 17:21:32 UTC
[activemq-artemis] 01/03: ARTEMIS-3523: Created delegated methods
replay in addressControl
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit ebf8adc72b261e7e2375f3aa96354af8a0783abd
Author: nbrendah <br...@gmail.com>
AuthorDate: Wed Oct 20 11:00:39 2021 +0300
ARTEMIS-3523: Created delegated methods replay in addressControl
---
.../api/core/management/AddressControl.java | 9 ++
.../core/management/impl/AddressControlImpl.java | 19 ++++
.../integration/management/AddressControlTest.java | 116 +++++++++++++++++++++
.../management/AddressControlUsingCoreTest.java | 10 ++
4 files changed, 154 insertions(+)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
index 539988b..beef4c3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
@@ -230,4 +230,13 @@ public interface AddressControl {
@Operation(desc = "Purges the queues bound to this address. Returns the total number of messages purged.", impact = MBeanOperationInfo.ACTION)
long purge() throws Exception;
+ @Operation(desc = "Makes the broker to read messages from the retention folder matching the address and filter.", impact = MBeanOperationInfo.ACTION)
+ void replay(@Parameter(name = "target", desc = "Where the replay data should be sent") String target,
+ @Parameter(name = "filter", desc = "Filter to apply on message selection. Null means everything matching the address") String filter) throws Exception;
+
+ @Operation(desc = "Makes the broker to read messages from the retention folder matching the address and filter.", impact = MBeanOperationInfo.ACTION)
+ void replay(@Parameter(name = "startScanDate", desc = "Start date where we will start scanning for journals to replay. Format YYYYMMDDHHMMSS") String startScan,
+ @Parameter(name = "endScanDate", desc = "Finish date where we will stop scannning for journals to replay. Format YYYYMMDDHHMMSS") String endScan,
+ @Parameter(name = "target", desc = "Where the replay data should be sent") String target,
+ @Parameter(name = "filter", desc = "Filter to apply on message selection. Null means everything matching the address") String filter) throws Exception;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index e384697..6277db9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -19,8 +19,10 @@ package org.apache.activemq.artemis.core.management.impl;
import javax.json.JsonArrayBuilder;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -48,6 +50,7 @@ import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.management.ManagementService;
+import org.apache.activemq.artemis.core.server.replay.ReplayManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.utils.JsonLoader;
@@ -574,6 +577,22 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
return totalMsgs;
}
+ @Override
+ public void replay(String target, String filter) throws Exception {
+ server.replay(null, null, this.getAddress(), target, filter);
+ }
+
+ @Override
+ public void replay(String startScan, String endScan, String target, String filter) throws Exception {
+
+ SimpleDateFormat format = ReplayManager.newRetentionSimpleDateFormat();
+
+ Date startScanDate = format.parse(startScan);
+ Date endScanDate = format.parse(endScan);
+
+ server.replay(startScanDate, endScanDate, this.getAddress(), target, filter);
+ }
+
// Private -------------------------------------------------------
private long getMessageCount(final DurabilityType durability) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
index 2095272..a303ad6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
@@ -16,10 +16,18 @@
*/
package org.apache.activemq.artemis.tests.integration.management;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.json.JsonArray;
import javax.json.JsonString;
+import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.EnumSet;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -47,8 +55,10 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.RandomUtil;
@@ -580,6 +590,111 @@ public class AddressControlTest extends ManagementTestBase {
Wait.assertEquals(0L, () -> addressControl.getMessageCount(), 2000, 100);
}
+ @Test
+ public void testReplayWithoutDate() throws Exception {
+ testReplaySimple(false);
+ }
+
+ @Test
+ public void testReplayWithDate() throws Exception {
+ testReplaySimple(true);
+ }
+
+ private void testReplaySimple(boolean useDate) throws Exception {
+ SimpleString address = RandomUtil.randomSimpleString();
+
+ AddressControl addressControl = createManagementControl(address);
+ String queue = "testQueue" + RandomUtil.randomString();
+ server.addAddressInfo(new AddressInfo(queue).addRoutingType(RoutingType.ANYCAST));
+ server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST).setAddress(queue));
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Queue jmsQueue = session.createQueue(queue);
+ MessageProducer producer = session.createProducer(jmsQueue);
+ producer.send(session.createTextMessage("before"));
+
+ connection.start();
+ MessageConsumer consumer = session.createConsumer(jmsQueue);
+ Assert.assertNotNull(consumer.receive(5000));
+ Assert.assertNull(consumer.receiveNoWait());
+
+ addressControl.replay(queue, null);
+ Assert.assertNotNull(consumer.receive(5000));
+ Assert.assertNull(consumer.receiveNoWait());
+
+ if (useDate) {
+ addressControl.replay("dontexist", null); // just to force a move next file, and copy stuff into place
+ SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
+ Thread.sleep(1000); // waiting a second just to have the timestamp change
+ String dateEnd = format.format(new Date());
+ Thread.sleep(1000); // waiting a second just to have the timestamp change
+ String dateStart = "19800101000000";
+
+
+ for (int i = 0; i < 100; i++) {
+ producer.send(session.createTextMessage("after receiving"));
+ }
+ for (int i = 0; i < 100; i++) {
+ Assert.assertNotNull(consumer.receive());
+ }
+ Assert.assertNull(consumer.receiveNoWait());
+ addressControl.replay(dateStart, dateEnd, queue, null);
+ for (int i = 0; i < 2; i++) { // replay of the replay will contain two messages
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("before", message.getText());
+ }
+ Assert.assertNull(consumer.receiveNoWait());
+ } else {
+ addressControl.replay(queue, null);
+
+ // replay of the replay, there will be two messages
+ for (int i = 0; i < 2; i++) {
+ Assert.assertNotNull(consumer.receive(5000));
+ }
+ Assert.assertNull(consumer.receiveNoWait());
+ }
+ }
+ }
+
+ @Test
+ public void testReplayFilter() throws Exception {
+ SimpleString address = RandomUtil.randomSimpleString();
+
+ AddressControl addressControl = createManagementControl(address);
+ String queue = "testQueue" + RandomUtil.randomString();
+ server.addAddressInfo(new AddressInfo(queue).addRoutingType(RoutingType.ANYCAST));
+ server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST).setAddress(queue));
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Queue jmsQueue = session.createQueue(queue);
+ MessageProducer producer = session.createProducer(jmsQueue);
+ for (int i = 0; i < 10; i++) {
+ TextMessage message = session.createTextMessage("message " + i);
+ message.setIntProperty("i", i);
+ producer.send(message);
+ }
+
+ connection.start();
+ MessageConsumer consumer = session.createConsumer(jmsQueue);
+ for (int i = 0; i < 10; i++) {
+ Assert.assertNotNull(consumer.receive(5000));
+ }
+ Assert.assertNull(consumer.receiveNoWait());
+
+ addressControl.replay(queue, "i=5");
+ TextMessage message = (TextMessage)consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(5, message.getIntProperty("i"));
+ Assert.assertEquals("message 5", message.getText());
+ Assert.assertNull(consumer.receiveNoWait());
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -607,6 +722,7 @@ public class AddressControlTest extends ManagementTestBase {
// Private -------------------------------------------------------
+
// Inner classes -------------------------------------------------
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
index c512a44..738b47d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java
@@ -179,6 +179,16 @@ public class AddressControlUsingCoreTest extends AddressControlTest {
}
@Override
+ public void replay(String startScan, String endScan, String target, String filter) throws Exception {
+ proxy.invokeOperation("replay", startScan, endScan, target, filter);
+ }
+
+ @Override
+ public void replay(String target, String filter) throws Exception {
+ proxy.invokeOperation("replay", target, filter);
+ }
+
+ @Override
public String sendMessage(Map<String, String> headers,
int type,
String body,