You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/01/31 23:12:16 UTC

[1/2] activemq-artemis git commit: This closes #1831

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 653175fee -> 3d79a0896


This closes #1831


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3d79a089
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3d79a089
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3d79a089

Branch: refs/heads/master
Commit: 3d79a089634a5bc7dfd213a6266c4f69d8f1eaff
Parents: 653175f 028d6f7
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Jan 31 18:07:12 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Jan 31 18:07:12 2018 -0500

----------------------------------------------------------------------
 .../artemis/core/server/impl/DivertImpl.java    |  4 +-
 .../management/QueueControlTest.java            | 76 ++++++++++++++++++--
 2 files changed, 72 insertions(+), 8 deletions(-)
----------------------------------------------------------------------



[2/2] activemq-artemis git commit: ARTEMIS-1645 diverted msg can't be retried from DLQ

Posted by cl...@apache.org.
ARTEMIS-1645 diverted msg can't be retried from DLQ


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/028d6f71
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/028d6f71
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/028d6f71

Branch: refs/heads/master
Commit: 028d6f71efb4b43eb6c3c3b261ac48576237f2cb
Parents: 653175f
Author: Justin Bertram <jb...@apache.org>
Authored: Tue Jan 30 22:03:38 2018 -0600
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Jan 31 18:07:12 2018 -0500

----------------------------------------------------------------------
 .../artemis/core/server/impl/DivertImpl.java    |  4 +-
 .../management/QueueControlTest.java            | 76 ++++++++++++++++++--
 2 files changed, 72 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/028d6f71/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index e28d011..953756f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.server.impl;
 
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -24,7 +25,6 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
 import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.jboss.logging.Logger;
 
@@ -98,7 +98,7 @@ public class DivertImpl implements Divert {
          copy = message.copy(id);
 
          // This will set the original MessageId, and the original address
-         copy.referenceOriginalMessage(message, null);
+         copy.referenceOriginalMessage(message, this.getUniqueName().toString());
 
          copy.setAddress(forwardAddress);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/028d6f71/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index f9adc2f..cbfba49 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -16,10 +16,8 @@
  */
 package org.apache.activemq.artemis.tests.integration.management;
 
-import javax.json.JsonArray;
-import javax.json.JsonObject;
-import javax.management.Notification;
-import javax.management.openmbean.CompositeData;
+import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
+
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
@@ -28,6 +26,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import javax.json.JsonArray;
+import javax.json.JsonObject;
+import javax.management.Notification;
+import javax.management.openmbean.CompositeData;
+
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.JsonUtil;
 import org.apache.activemq.artemis.api.core.Message;
@@ -47,6 +50,7 @@ import org.apache.activemq.artemis.api.core.management.MessageCounterInfo;
 import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
@@ -59,8 +63,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
-
 public class QueueControlTest extends ManagementTestBase {
 
    private ActiveMQServer server;
@@ -888,6 +890,68 @@ public class QueueControlTest extends ManagementTestBase {
    }
 
    /**
+    * Test retry - get a diverted message from DLQ and put on original queue.
+    */
+   @Test
+   public void testRetryDivertedMessage() throws Exception {
+      final SimpleString dla = new SimpleString("DLA");
+      final SimpleString dlq = new SimpleString("DLQ");
+      final SimpleString forwardingQueue = new SimpleString("forwardingQueue");
+      final SimpleString forwardingAddress = new SimpleString("forwardingAddress");
+      final SimpleString myTopic = new SimpleString("myTopic");
+      final String sampleText = "Put me on DLQ";
+
+      AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
+      server.getAddressSettingsRepository().addMatch(forwardingAddress.toString(), addressSettings);
+
+      // create target queue, DLQ and source topic
+      session.createQueue(dla, RoutingType.ANYCAST, dlq, null, false);
+      session.createQueue(forwardingAddress, RoutingType.ANYCAST, forwardingQueue, null, false);
+      session.createAddress(myTopic, RoutingType.MULTICAST, false);
+
+      DivertConfiguration divert = new DivertConfiguration().setName("local-divert")
+                                                            .setRoutingName("some-name").setAddress(myTopic.toString())
+                                                            .setForwardingAddress(forwardingAddress.toString()).setExclusive(false);
+      server.deployDivert(divert);
+
+      // Send message to topic.
+      ClientProducer producer = session.createProducer(myTopic);
+      producer.send(createTextMessage(session, sampleText));
+      session.start();
+
+      ClientConsumer clientConsumer = session.createConsumer(forwardingQueue);
+      ClientMessage clientMessage = clientConsumer.receive(500);
+      clientMessage.acknowledge();
+      Assert.assertNotNull(clientMessage);
+
+      Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
+
+      // force a rollback to DLQ
+      session.rollback();
+      clientMessage = clientConsumer.receiveImmediate();
+      Assert.assertNull(clientMessage);
+
+      QueueControl queueControl = createManagementControl(dla, dlq, RoutingType.ANYCAST);
+      Assert.assertEquals(1, getMessageCount(queueControl));
+      final long messageID = getFirstMessageId(queueControl);
+
+      // Retry the message - i.e. it should go from DLQ to original Queue.
+      Assert.assertTrue(queueControl.retryMessage(messageID));
+
+      // Assert DLQ is empty...
+      Assert.assertEquals(0, getMessageCount(queueControl));
+
+      // .. and that the message is now on the original queue once more.
+      clientMessage = clientConsumer.receive(500);
+      Assert.assertNotNull(clientMessage); // fails because of AMQ222196 !!!
+      clientMessage.acknowledge();
+
+      Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());
+
+      clientConsumer.close();
+   }
+
+   /**
     * Test retry multiple messages from  DLQ to original queue.
     */
    @Test