You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2021/10/08 13:09:35 UTC

[activemq-artemis] branch main updated: ARTEMIS-3313 routing-type conflict during import/export

This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new c3d93f5  ARTEMIS-3313 routing-type conflict during import/export
     new 7109d0d  This closes #3782
c3d93f5 is described below

commit c3d93f5590b97ab31a4a46fd324b01eb4548940c
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Thu Oct 7 20:13:38 2021 -0500

    ARTEMIS-3313 routing-type conflict during import/export
---
 .../apache/activemq/artemis/api/core/Message.java  |   8 ++
 .../core/postoffice/impl/PostOfficeImpl.java       |   2 +
 .../artemis/core/server/impl/QueueImpl.java        |  17 ++-
 .../integration/management/QueueControlTest.java   | 118 +++++++++++++++++++++
 .../tests/integration/server/ScaleDownTest.java    |  61 +++++++++++
 5 files changed, 202 insertions(+), 4 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index aba66d6..5525fee 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -162,6 +162,11 @@ public interface Message {
    SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE");
 
    /**
+    * The original routing type of a message before getting transferred through DLQ or expiry
+    */
+   SimpleString HDR_ORIG_ROUTING_TYPE = new SimpleString("_AMQ_ORIG_ROUTING_TYPE");
+
+   /**
     * The time at which the message arrived at the broker.
     */
    SimpleString HDR_INGRESS_TIMESTAMP = new SimpleString("_AMQ_INGRESS_TIMESTAMP");
@@ -465,6 +470,9 @@ public interface Message {
       setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
       setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
       setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
+      if (original.getRoutingType() != null) {
+         setBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE, original.getRoutingType().getType());
+      }
 
       // reset expiry
       setExpiration(0);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 3e51f23..feaa7b1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1214,6 +1214,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
             message.setAddress(dlaAddress);
 
+            message.setRoutingType(null);
+
             message.reencode();
 
             route(message, new RoutingContextImpl(context.getTransaction()), false, true, null, true);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index fef2d4b..af3766d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2688,8 +2688,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
             String originalMessageAddress = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_ADDRESS);
             String originalMessageQueue = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_QUEUE);
+            Binding binding = null;
 
-            if (originalMessageAddress != null) {
+            if (originalMessageQueue != null) {
+               binding = postOffice.getBinding(SimpleString.toSimpleString(originalMessageQueue));
+            }
+
+            if (originalMessageAddress != null && binding != null) {
 
                incDelivering(ref);
 
@@ -2697,9 +2702,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                if (originalMessageQueue != null && !originalMessageQueue.equals(originalMessageAddress)) {
                   targetQueue = queues.get(originalMessageQueue);
                   if (targetQueue == null) {
-                     Binding binding = postOffice.getBinding(SimpleString.toSimpleString(originalMessageQueue));
-
-                     if (binding != null && binding instanceof LocalQueueBinding) {
+                     if (binding instanceof LocalQueueBinding) {
                         targetQueue = ((LocalQueueBinding) binding).getID();
                         queues.put(originalMessageQueue, targetQueue);
                      }
@@ -2715,6 +2718,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                return true;
             }
 
+            ActiveMQServerLogger.LOGGER.unableToFindTargetQueue(originalMessageQueue);
             return false;
          }
       });
@@ -3387,6 +3391,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
       copyMessage.setAddress(toAddress);
 
+      if (ref.getMessage().getAnnotationString(Message.HDR_ORIG_ROUTING_TYPE) != null) {
+         copyMessage.setRoutingType(RoutingType.getType(ref.getMessage().getByteProperty(Message.HDR_ORIG_ROUTING_TYPE)));
+      }
+
       if (queueIDs != null && queueIDs.length > 0) {
          ByteBuffer buffer = ByteBuffer.allocate(8 * queueIDs.length);
          for (long id : queueIDs) {
@@ -3543,6 +3551,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
 
       copy.setExpiration(0);
+      copy.setRoutingType(null);
 
       if (expiry) {
          copy.setBrokerProperty(Message.HDR_ACTUAL_EXPIRY_TIME, System.currentTimeMillis());
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 9260971..d45bcff 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -3637,6 +3637,124 @@ public class QueueControlTest extends ManagementTestBase {
       session.deleteQueue(queue);
    }
 
+   /**
+    * Test retry - get a message from auto-created DLA/DLQ with HDR_ORIG_RoutingType set and put on original queue.
+    */
+   @Test
+   public void testRetryMessageWithAutoCreatedResourcesAndOrigRoutingType() throws Exception {
+      final SimpleString dla = new SimpleString("DLA");
+      final SimpleString qName = new SimpleString("q1");
+      final SimpleString adName = new SimpleString("ad1");
+      final String sampleText = "Put me on DLQ";
+
+      AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(adName.toString());
+      final SimpleString dlq = addressSettings.getDeadLetterQueuePrefix().concat(adName).concat(addressSettings.getDeadLetterQueueSuffix());
+
+      server.getAddressSettingsRepository().addMatch(adName.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true));
+      session.createQueue(new QueueConfiguration(qName).setAddress(adName).setDurable(durable).setRoutingType(RoutingType.ANYCAST));
+
+      // Send message to queue.
+      ClientProducer producer = session.createProducer(adName);
+      ClientMessage m = createTextMessage(session, sampleText);
+
+      // Set ORIG RoutingType header
+      m.putByteProperty(Message.HDR_ORIG_ROUTING_TYPE, (byte) 1);
+      producer.send(m);
+      session.start();
+
+      ClientConsumer clientConsumer = session.createConsumer(qName);
+      ClientMessage clientMessage = clientConsumer.receive(500);
+      clientMessage.acknowledge();
+      Assert.assertNotNull(clientMessage);
+
+      Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
+
+      // force a rollback to DLQ
+      session.rollback();
+      clientMessage = clientConsumer.receiveImmediate();
+      Assert.assertNull(clientMessage);
+
+      QueueControl queueControl = createManagementControl(dla, dlq);
+      assertMessageMetrics(queueControl, 1, true);
+      final long messageID = getFirstMessageId(queueControl);
+
+      // Retry the message - i.e. it should go from DLQ to original Queue.
+      Assert.assertTrue(queueControl.retryMessage(messageID));
+
+      // Assert DLQ is empty...
+      Assert.assertEquals(0, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 0, durable);
+
+      // .. and that the message is now on the original queue with ORIG RoutingType set as RoutingType
+      clientMessage = clientConsumer.receive(500);
+      clientMessage.acknowledge();
+      assertTrue(clientMessage.getRoutingType() == RoutingType.ANYCAST);
+      Assert.assertNotNull(clientMessage);
+
+      Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());
+
+      clientConsumer.close();
+   }
+
+   /**
+    * Test retry - get a message from auto-created DLA/DLQ and put on original queue.
+    */
+   @Test
+   public void testRetryMessageReturnedWhenNoOrigQueue() throws Exception {
+      final SimpleString dla = new SimpleString("DLA");
+      final SimpleString qName = new SimpleString("q1");
+      final SimpleString adName = new SimpleString("ad1");
+      final String sampleText = "Put me on DLQ";
+
+      AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(adName.toString());
+      final SimpleString dlq = addressSettings.getDeadLetterQueuePrefix().concat(adName).concat(addressSettings.getDeadLetterQueueSuffix());
+
+      server.getAddressSettingsRepository().addMatch(adName.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true));
+
+      session.createQueue(new QueueConfiguration(qName).setAddress(adName).setDurable(durable));
+
+      // Send message to queue.
+      ClientProducer producer = session.createProducer(adName);
+      producer.send(createTextMessage(session, sampleText));
+      session.start();
+
+      ClientConsumer clientConsumer = session.createConsumer(qName);
+      ClientMessage clientMessage = clientConsumer.receive(500);
+      clientMessage.acknowledge();
+      Assert.assertNotNull(clientMessage);
+
+      Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
+
+      // force a rollback to DLQ
+      session.rollback();
+      clientMessage = clientConsumer.receiveImmediate();
+      Assert.assertNull(clientMessage);
+      clientConsumer.close();
+
+      QueueControl queueControl = createManagementControl(dla, dlq);
+      assertMessageMetrics(queueControl, 1, true);
+      final long messageID = getFirstMessageId(queueControl);
+
+      //Delete original queue
+      session.deleteQueue(qName);
+      // Retry the message
+      queueControl.retryMessage(messageID);
+      Thread.sleep(100);
+
+      // Assert DLQ is not empty...
+      Assert.assertEquals(1, getMessageCount(queueControl));
+
+      // .. and that the message is still intact on DLQ
+      clientConsumer = session.createConsumer(dlq);
+      clientMessage = clientConsumer.receive(500);
+      clientMessage.acknowledge();
+      Assert.assertNotNull(clientMessage);
+
+      Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());
+
+      clientConsumer.close();
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
index 16cf9c9..ba6f748 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
@@ -735,4 +735,65 @@ public class ScaleDownTest extends ClusterTestBase {
       removeConsumer(0);
       removeConsumer(1);
    }
+
+   @Test
+   public void testScaleDownMessageWithAutoCreatedDLAResources() throws Exception {
+      final SimpleString dla = new SimpleString("DLA");
+      final SimpleString queueName = new SimpleString("q1");
+      final SimpleString addressName = new SimpleString("q1");
+      final String sampleText = "Put me on DLA";
+
+      //Set up resources for Auto-created DLAs
+      AddressSettings addressSettings = servers[0].getAddressSettingsRepository().getMatch(addressName.toString());
+      servers[0].getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true));
+      final SimpleString dlq = addressSettings.getDeadLetterQueuePrefix().concat(addressName).concat(addressSettings.getDeadLetterQueueSuffix());
+
+      createQueue(0, addressName.toString(), queueName.toString(), null, false, null, null, RoutingType.ANYCAST);
+
+      ClientSessionFactory sf = sfs[0];
+      ClientSession session = addClientSession(sf.createSession(true, false));
+      ClientProducer producer = addClientProducer(session.createProducer(addressName));
+
+      // Send message to queue with RoutingType header
+      ClientMessage m = createTextMessage(session, sampleText);
+      m.putByteProperty(Message.HDR_ROUTING_TYPE, (byte) 1);
+      producer.send(m);
+      session.start();
+
+      // Get message
+      ClientConsumer consumer = session.createConsumer(queueName);
+      ClientMessage message = consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals(message.getBodyBuffer().readString(), sampleText);
+      assertTrue(message.getRoutingType() == RoutingType.ANYCAST);
+      message.acknowledge();
+
+      // force a rollback to DLA
+      session.rollback();
+      message = consumer.receiveImmediate();
+      Assert.assertNull(message);
+
+      //Make sure it ends up on DLA
+      consumer.close();
+      consumer = session.createConsumer(dlq.toString());
+      message = consumer.receive(1000);
+      Assert.assertNotNull(message);
+      assertTrue(message.getRoutingType() == null);
+
+      //Scale-Down
+      servers[0].stop();
+
+      //Get message on seconds node
+      sf = sfs[1];
+      session = addClientSession(sf.createSession(false, true, true));
+      consumer = session.createConsumer(dlq.toString());
+      session.start();
+
+      message = consumer.receive(1000);
+      Assert.assertNotNull(message);
+      message.acknowledge();
+      Assert.assertEquals(sampleText, message.getBodyBuffer().readString());
+
+      consumer.close();
+   }
 }