You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/09/26 10:25:19 UTC

activemq git commit: AMQ-7062 - have redelivery plugin ignore messages detected as duplicates

Repository: activemq
Updated Branches:
  refs/heads/master cae66f5d3 -> 524615128


AMQ-7062 - have redelivery plugin ignore messages detected as duplicates


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/52461512
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/52461512
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/52461512

Branch: refs/heads/master
Commit: 5246151288873d05e2a892afe7d42f3ee0e44275
Parents: cae66f5
Author: gtully <ga...@gmail.com>
Authored: Wed Sep 26 11:22:59 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Sep 26 11:22:59 2018 +0100

----------------------------------------------------------------------
 .../activemq/broker/region/BaseDestination.java |  9 +--
 .../activemq/broker/util/RedeliveryPlugin.java  |  6 +-
 .../activemq/broker/BrokerRedeliveryTest.java   | 58 ++++++++++++++++++--
 3 files changed, 61 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/52461512/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 5ab6737..95c7e15 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -59,6 +59,7 @@ public abstract class BaseDestination implements Destination {
     public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
     public static final int MAX_PRODUCERS_TO_AUDIT = 64;
     public static final int MAX_AUDIT_DEPTH = 10000;
+    public static final String DUPLICATE_FROM_STORE_MSG_PREFIX = "duplicate from store for ";
 
     protected final AtomicBoolean started = new AtomicBoolean();
     protected final ActiveMQDestination destination;
@@ -881,16 +882,16 @@ public abstract class BaseDestination implements Destination {
     }
 
     @Override
-    public void duplicateFromStore(Message message, Subscription durableSub) {
+    public void duplicateFromStore(Message message, Subscription subscription) {
         ConnectionContext connectionContext = createConnectionContext();
-        getLog().warn("duplicate message from store {}, redirecting for dlq processing", message.getMessageId());
-        Throwable cause = new Throwable("duplicate from store for " + destination);
+        getLog().warn("{}{}, redirecting {} for dlq processing", DUPLICATE_FROM_STORE_MSG_PREFIX, destination, message.getMessageId());
+        Throwable cause = new Throwable(DUPLICATE_FROM_STORE_MSG_PREFIX + destination);
         message.setRegionDestination(this);
         broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
         MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1);
         messageAck.setPoisonCause(cause);
         try {
-            acknowledge(connectionContext, durableSub, messageAck, message);
+            acknowledge(connectionContext, subscription, messageAck, message);
         } catch (IOException e) {
             getLog().error("Failed to acknowledge duplicate message {} from {} with {}", message.getMessageId(), destination, messageAck);
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/52461512/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java b/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
index f270744..8f66890 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
@@ -38,6 +38,8 @@ import org.apache.activemq.state.ProducerState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.activemq.broker.region.BaseDestination.DUPLICATE_FROM_STORE_MSG_PREFIX;
+
 /**
  * Replace regular DLQ handling with redelivery via a resend to the original destination
  * after a delay
@@ -128,8 +130,8 @@ public class RedeliveryPlugin extends BrokerPluginSupport {
 
     @Override
     public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) {
-        if (messageReference.isExpired()) {
-            // there are two uses of  sendToDeadLetterQueue, we are only interested in valid messages
+        if (messageReference.isExpired() || (poisonCause != null && poisonCause.getMessage() != null && poisonCause.getMessage().contains(DUPLICATE_FROM_STORE_MSG_PREFIX))) {
+            // there are three uses of  sendToDeadLetterQueue, we are only interested in valid messages
             return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
         } else {
             try {

http://git-wip-us.apache.org/repos/asf/activemq/blob/52461512/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
index 9971e8e..90e33ab 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
@@ -26,6 +26,8 @@ import javax.jms.Session;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.broker.util.RedeliveryPlugin;
@@ -129,6 +131,46 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
         assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data"));
     }
 
+    public void testNoScheduledRedeliveryOfDuplicates() throws Exception {
+        broker = createBroker(true);
+
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setUseCache(false); // disable the cache such that duplicates are not suppressed on send
+
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(policyEntry);
+        broker.setDestinationPolicy(policyMap);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.start();
+
+        ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection();
+        consumerConnection.start();
+        Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer = consumerSession.createConsumer(destination);
+
+        ActiveMQConnection producerConnection = (ActiveMQConnection) createConnection();
+        producerConnection.start();
+        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(destination);
+        Message message = producerSession.createMessage();
+        message.setStringProperty("data", data);
+        producer.send(message);
+
+        message = consumer.receive(1000);
+        assertNotNull("got message", message);
+        message.acknowledge();
+
+        // send it again
+        // should go to dlq as a duplicate from the store
+        producerConnection.getTransport().request(message);
+
+        // validate DLQ
+        MessageConsumer dlqConsumer = consumerSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
+        Message dlqMessage = dlqConsumer.receive(4000);
+        assertNotNull("Got message from dql", dlqMessage);
+        assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data"));
+    }
+
     private void sendMessage(int timeToLive) throws Exception {
         ActiveMQConnection producerConnection = (ActiveMQConnection) createConnection();
         producerConnection.start();
@@ -144,8 +186,16 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
     }
 
     private void startBroker(boolean deleteMessages) throws Exception {
+        broker = createBroker(false);
+        if (deleteMessages) {
+            broker.setDeleteAllMessagesOnStartup(true);
+        }
+        broker.start();
+    }
+
+    private BrokerService createBroker(boolean persistent) throws Exception {
         broker = new BrokerService();
-        broker.setPersistent(false);
+        broker.setPersistent(persistent);
         broker.setSchedulerSupport(true);
 
         RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin();
@@ -160,11 +210,7 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
         redeliveryPlugin.setRedeliveryPolicyMap(redeliveryPolicyMap);
 
         broker.setPlugins(new BrokerPlugin[]{redeliveryPlugin});
-
-        if (deleteMessages) {
-            broker.setDeleteAllMessagesOnStartup(true);
-        }
-        broker.start();
+        return broker;
     }
 
     private void stopBroker() throws Exception {