You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2021/10/08 13:09:35 UTC
[activemq-artemis] branch main updated: ARTEMIS-3313 routing-type
conflict during import/export
This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new c3d93f5 ARTEMIS-3313 routing-type conflict during import/export
new 7109d0d This closes #3782
c3d93f5 is described below
commit c3d93f5590b97ab31a4a46fd324b01eb4548940c
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Thu Oct 7 20:13:38 2021 -0500
ARTEMIS-3313 routing-type conflict during import/export
---
.../apache/activemq/artemis/api/core/Message.java | 8 ++
.../core/postoffice/impl/PostOfficeImpl.java | 2 +
.../artemis/core/server/impl/QueueImpl.java | 17 ++-
.../integration/management/QueueControlTest.java | 118 +++++++++++++++++++++
.../tests/integration/server/ScaleDownTest.java | 61 +++++++++++
5 files changed, 202 insertions(+), 4 deletions(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index aba66d6..5525fee 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -162,6 +162,11 @@ public interface Message {
SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE");
/**
+ * The original routing type of a message before getting transferred through DLQ or expiry
+ */
+ SimpleString HDR_ORIG_ROUTING_TYPE = new SimpleString("_AMQ_ORIG_ROUTING_TYPE");
+
+ /**
* The time at which the message arrived at the broker.
*/
SimpleString HDR_INGRESS_TIMESTAMP = new SimpleString("_AMQ_INGRESS_TIMESTAMP");
@@ -465,6 +470,9 @@ public interface Message {
setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
+ if (original.getRoutingType() != null) {
+ setBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE, original.getRoutingType().getType());
+ }
// reset expiry
setExpiration(0);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 3e51f23..feaa7b1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1214,6 +1214,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
message.setAddress(dlaAddress);
+ message.setRoutingType(null);
+
message.reencode();
route(message, new RoutingContextImpl(context.getTransaction()), false, true, null, true);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index fef2d4b..af3766d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2688,8 +2688,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
String originalMessageAddress = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_ADDRESS);
String originalMessageQueue = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_QUEUE);
+ Binding binding = null;
- if (originalMessageAddress != null) {
+ if (originalMessageQueue != null) {
+ binding = postOffice.getBinding(SimpleString.toSimpleString(originalMessageQueue));
+ }
+
+ if (originalMessageAddress != null && binding != null) {
incDelivering(ref);
@@ -2697,9 +2702,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (originalMessageQueue != null && !originalMessageQueue.equals(originalMessageAddress)) {
targetQueue = queues.get(originalMessageQueue);
if (targetQueue == null) {
- Binding binding = postOffice.getBinding(SimpleString.toSimpleString(originalMessageQueue));
-
- if (binding != null && binding instanceof LocalQueueBinding) {
+ if (binding instanceof LocalQueueBinding) {
targetQueue = ((LocalQueueBinding) binding).getID();
queues.put(originalMessageQueue, targetQueue);
}
@@ -2715,6 +2718,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return true;
}
+ ActiveMQServerLogger.LOGGER.unableToFindTargetQueue(originalMessageQueue);
return false;
}
});
@@ -3387,6 +3391,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
copyMessage.setAddress(toAddress);
+ if (ref.getMessage().getAnnotationString(Message.HDR_ORIG_ROUTING_TYPE) != null) {
+ copyMessage.setRoutingType(RoutingType.getType(ref.getMessage().getByteProperty(Message.HDR_ORIG_ROUTING_TYPE)));
+ }
+
if (queueIDs != null && queueIDs.length > 0) {
ByteBuffer buffer = ByteBuffer.allocate(8 * queueIDs.length);
for (long id : queueIDs) {
@@ -3543,6 +3551,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
copy.setExpiration(0);
+ copy.setRoutingType(null);
if (expiry) {
copy.setBrokerProperty(Message.HDR_ACTUAL_EXPIRY_TIME, System.currentTimeMillis());
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 9260971..d45bcff 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -3637,6 +3637,124 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue);
}
+ /**
+ * Test retry - get a message from auto-created DLA/DLQ with HDR_ORIG_RoutingType set and put on original queue.
+ */
+ @Test
+ public void testRetryMessageWithAutoCreatedResourcesAndOrigRoutingType() throws Exception {
+ final SimpleString dla = new SimpleString("DLA");
+ final SimpleString qName = new SimpleString("q1");
+ final SimpleString adName = new SimpleString("ad1");
+ final String sampleText = "Put me on DLQ";
+
+ AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(adName.toString());
+ final SimpleString dlq = addressSettings.getDeadLetterQueuePrefix().concat(adName).concat(addressSettings.getDeadLetterQueueSuffix());
+
+ server.getAddressSettingsRepository().addMatch(adName.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true));
+ session.createQueue(new QueueConfiguration(qName).setAddress(adName).setDurable(durable).setRoutingType(RoutingType.ANYCAST));
+
+ // Send message to queue.
+ ClientProducer producer = session.createProducer(adName);
+ ClientMessage m = createTextMessage(session, sampleText);
+
+ // Set ORIG RoutingType header
+ m.putByteProperty(Message.HDR_ORIG_ROUTING_TYPE, (byte) 1);
+ producer.send(m);
+ session.start();
+
+ ClientConsumer clientConsumer = session.createConsumer(qName);
+ ClientMessage clientMessage = clientConsumer.receive(500);
+ clientMessage.acknowledge();
+ Assert.assertNotNull(clientMessage);
+
+ Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
+
+ // force a rollback to DLQ
+ session.rollback();
+ clientMessage = clientConsumer.receiveImmediate();
+ Assert.assertNull(clientMessage);
+
+ QueueControl queueControl = createManagementControl(dla, dlq);
+ assertMessageMetrics(queueControl, 1, true);
+ final long messageID = getFirstMessageId(queueControl);
+
+ // Retry the message - i.e. it should go from DLQ to original Queue.
+ Assert.assertTrue(queueControl.retryMessage(messageID));
+
+ // Assert DLQ is empty...
+ Assert.assertEquals(0, getMessageCount(queueControl));
+ assertMessageMetrics(queueControl, 0, durable);
+
+ // .. and that the message is now on the original queue with ORIG RoutingType set as RoutingType
+ clientMessage = clientConsumer.receive(500);
+ clientMessage.acknowledge();
+ assertTrue(clientMessage.getRoutingType() == RoutingType.ANYCAST);
+ Assert.assertNotNull(clientMessage);
+
+ Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());
+
+ clientConsumer.close();
+ }
+
+ /**
+ * Test retry - get a message from auto-created DLA/DLQ and put on original queue.
+ */
+ @Test
+ public void testRetryMessageReturnedWhenNoOrigQueue() throws Exception {
+ final SimpleString dla = new SimpleString("DLA");
+ final SimpleString qName = new SimpleString("q1");
+ final SimpleString adName = new SimpleString("ad1");
+ final String sampleText = "Put me on DLQ";
+
+ AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(adName.toString());
+ final SimpleString dlq = addressSettings.getDeadLetterQueuePrefix().concat(adName).concat(addressSettings.getDeadLetterQueueSuffix());
+
+ server.getAddressSettingsRepository().addMatch(adName.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true));
+
+ session.createQueue(new QueueConfiguration(qName).setAddress(adName).setDurable(durable));
+
+ // Send message to queue.
+ ClientProducer producer = session.createProducer(adName);
+ producer.send(createTextMessage(session, sampleText));
+ session.start();
+
+ ClientConsumer clientConsumer = session.createConsumer(qName);
+ ClientMessage clientMessage = clientConsumer.receive(500);
+ clientMessage.acknowledge();
+ Assert.assertNotNull(clientMessage);
+
+ Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
+
+ // force a rollback to DLQ
+ session.rollback();
+ clientMessage = clientConsumer.receiveImmediate();
+ Assert.assertNull(clientMessage);
+ clientConsumer.close();
+
+ QueueControl queueControl = createManagementControl(dla, dlq);
+ assertMessageMetrics(queueControl, 1, true);
+ final long messageID = getFirstMessageId(queueControl);
+
+ //Delete original queue
+ session.deleteQueue(qName);
+ // Retry the message
+ queueControl.retryMessage(messageID);
+ Thread.sleep(100);
+
+ // Assert DLQ is not empty...
+ Assert.assertEquals(1, getMessageCount(queueControl));
+
+ // .. and that the message is still intact on DLQ
+ clientConsumer = session.createConsumer(dlq);
+ clientMessage = clientConsumer.receive(500);
+ clientMessage.acknowledge();
+ Assert.assertNotNull(clientMessage);
+
+ Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());
+
+ clientConsumer.close();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
index 16cf9c9..ba6f748 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
@@ -735,4 +735,65 @@ public class ScaleDownTest extends ClusterTestBase {
removeConsumer(0);
removeConsumer(1);
}
+
+ @Test
+ public void testScaleDownMessageWithAutoCreatedDLAResources() throws Exception {
+ final SimpleString dla = new SimpleString("DLA");
+ final SimpleString queueName = new SimpleString("q1");
+ final SimpleString addressName = new SimpleString("q1");
+ final String sampleText = "Put me on DLA";
+
+ //Set up resources for Auto-created DLAs
+ AddressSettings addressSettings = servers[0].getAddressSettingsRepository().getMatch(addressName.toString());
+ servers[0].getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true));
+ final SimpleString dlq = addressSettings.getDeadLetterQueuePrefix().concat(addressName).concat(addressSettings.getDeadLetterQueueSuffix());
+
+ createQueue(0, addressName.toString(), queueName.toString(), null, false, null, null, RoutingType.ANYCAST);
+
+ ClientSessionFactory sf = sfs[0];
+ ClientSession session = addClientSession(sf.createSession(true, false));
+ ClientProducer producer = addClientProducer(session.createProducer(addressName));
+
+ // Send message to queue with RoutingType header
+ ClientMessage m = createTextMessage(session, sampleText);
+ m.putByteProperty(Message.HDR_ROUTING_TYPE, (byte) 1);
+ producer.send(m);
+ session.start();
+
+ // Get message
+ ClientConsumer consumer = session.createConsumer(queueName);
+ ClientMessage message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(message.getBodyBuffer().readString(), sampleText);
+ assertTrue(message.getRoutingType() == RoutingType.ANYCAST);
+ message.acknowledge();
+
+ // force a rollback to DLA
+ session.rollback();
+ message = consumer.receiveImmediate();
+ Assert.assertNull(message);
+
+ //Make sure it ends up on DLA
+ consumer.close();
+ consumer = session.createConsumer(dlq.toString());
+ message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ assertTrue(message.getRoutingType() == null);
+
+ //Scale-Down
+ servers[0].stop();
+
+ //Get message on seconds node
+ sf = sfs[1];
+ session = addClientSession(sf.createSession(false, true, true));
+ consumer = session.createConsumer(dlq.toString());
+ session.start();
+
+ message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ message.acknowledge();
+ Assert.assertEquals(sampleText, message.getBodyBuffer().readString());
+
+ consumer.close();
+ }
}