You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/01/31 23:12:16 UTC
[1/2] activemq-artemis git commit: This closes #1831
Repository: activemq-artemis
Updated Branches:
refs/heads/master 653175fee -> 3d79a0896
This closes #1831
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3d79a089
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3d79a089
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3d79a089
Branch: refs/heads/master
Commit: 3d79a089634a5bc7dfd213a6266c4f69d8f1eaff
Parents: 653175f 028d6f7
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Jan 31 18:07:12 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Jan 31 18:07:12 2018 -0500
----------------------------------------------------------------------
.../artemis/core/server/impl/DivertImpl.java | 4 +-
.../management/QueueControlTest.java | 76 ++++++++++++++++++--
2 files changed, 72 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
[2/2] activemq-artemis git commit: ARTEMIS-1645 diverted msg can't be
retried from DLQ
Posted by cl...@apache.org.
ARTEMIS-1645 diverted msg can't be retried from DLQ
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/028d6f71
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/028d6f71
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/028d6f71
Branch: refs/heads/master
Commit: 028d6f71efb4b43eb6c3c3b261ac48576237f2cb
Parents: 653175f
Author: Justin Bertram <jb...@apache.org>
Authored: Tue Jan 30 22:03:38 2018 -0600
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Jan 31 18:07:12 2018 -0500
----------------------------------------------------------------------
.../artemis/core/server/impl/DivertImpl.java | 4 +-
.../management/QueueControlTest.java | 76 ++++++++++++++++++--
2 files changed, 72 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/028d6f71/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index e28d011..953756f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -24,7 +25,6 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.jboss.logging.Logger;
@@ -98,7 +98,7 @@ public class DivertImpl implements Divert {
copy = message.copy(id);
// This will set the original MessageId, and the original address
- copy.referenceOriginalMessage(message, null);
+ copy.referenceOriginalMessage(message, this.getUniqueName().toString());
copy.setAddress(forwardAddress);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/028d6f71/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index f9adc2f..cbfba49 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -16,10 +16,8 @@
*/
package org.apache.activemq.artemis.tests.integration.management;
-import javax.json.JsonArray;
-import javax.json.JsonObject;
-import javax.management.Notification;
-import javax.management.openmbean.CompositeData;
+import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
+
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
@@ -28,6 +26,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import javax.json.JsonArray;
+import javax.json.JsonObject;
+import javax.management.Notification;
+import javax.management.openmbean.CompositeData;
+
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.Message;
@@ -47,6 +50,7 @@ import org.apache.activemq.artemis.api.core.management.MessageCounterInfo;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
@@ -59,8 +63,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
-
public class QueueControlTest extends ManagementTestBase {
private ActiveMQServer server;
@@ -888,6 +890,68 @@ public class QueueControlTest extends ManagementTestBase {
}
/**
+ * Test retry - get a diverted message from DLQ and put on original queue.
+ */
+ @Test
+ public void testRetryDivertedMessage() throws Exception {
+ final SimpleString dla = new SimpleString("DLA");
+ final SimpleString dlq = new SimpleString("DLQ");
+ final SimpleString forwardingQueue = new SimpleString("forwardingQueue");
+ final SimpleString forwardingAddress = new SimpleString("forwardingAddress");
+ final SimpleString myTopic = new SimpleString("myTopic");
+ final String sampleText = "Put me on DLQ";
+
+ AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
+ server.getAddressSettingsRepository().addMatch(forwardingAddress.toString(), addressSettings);
+
+ // create target queue, DLQ and source topic
+ session.createQueue(dla, RoutingType.ANYCAST, dlq, null, false);
+ session.createQueue(forwardingAddress, RoutingType.ANYCAST, forwardingQueue, null, false);
+ session.createAddress(myTopic, RoutingType.MULTICAST, false);
+
+ DivertConfiguration divert = new DivertConfiguration().setName("local-divert")
+ .setRoutingName("some-name").setAddress(myTopic.toString())
+ .setForwardingAddress(forwardingAddress.toString()).setExclusive(false);
+ server.deployDivert(divert);
+
+ // Send message to topic.
+ ClientProducer producer = session.createProducer(myTopic);
+ producer.send(createTextMessage(session, sampleText));
+ session.start();
+
+ ClientConsumer clientConsumer = session.createConsumer(forwardingQueue);
+ ClientMessage clientMessage = clientConsumer.receive(500);
+ clientMessage.acknowledge();
+ Assert.assertNotNull(clientMessage);
+
+ Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
+
+ // force a rollback to DLQ
+ session.rollback();
+ clientMessage = clientConsumer.receiveImmediate();
+ Assert.assertNull(clientMessage);
+
+ QueueControl queueControl = createManagementControl(dla, dlq, RoutingType.ANYCAST);
+ Assert.assertEquals(1, getMessageCount(queueControl));
+ final long messageID = getFirstMessageId(queueControl);
+
+ // Retry the message - i.e. it should go from DLQ to original Queue.
+ Assert.assertTrue(queueControl.retryMessage(messageID));
+
+ // Assert DLQ is empty...
+ Assert.assertEquals(0, getMessageCount(queueControl));
+
+ // .. and that the message is now on the original queue once more.
+ clientMessage = clientConsumer.receive(500);
+ Assert.assertNotNull(clientMessage); // fails because of AMQ222196 !!!
+ clientMessage.acknowledge();
+
+ Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());
+
+ clientConsumer.close();
+ }
+
+ /**
* Test retry multiple messages from DLQ to original queue.
*/
@Test