You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/09/26 10:25:19 UTC
activemq git commit: AMQ-7062 - have redelivery plugin ignore
messages detected as duplicates
Repository: activemq
Updated Branches:
refs/heads/master cae66f5d3 -> 524615128
AMQ-7062 - have redelivery plugin ignore messages detected as duplicates
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/52461512
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/52461512
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/52461512
Branch: refs/heads/master
Commit: 5246151288873d05e2a892afe7d42f3ee0e44275
Parents: cae66f5
Author: gtully <ga...@gmail.com>
Authored: Wed Sep 26 11:22:59 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Sep 26 11:22:59 2018 +0100
----------------------------------------------------------------------
.../activemq/broker/region/BaseDestination.java | 9 +--
.../activemq/broker/util/RedeliveryPlugin.java | 6 +-
.../activemq/broker/BrokerRedeliveryTest.java | 58 ++++++++++++++++++--
3 files changed, 61 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/52461512/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 5ab6737..95c7e15 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -59,6 +59,7 @@ public abstract class BaseDestination implements Destination {
public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
public static final int MAX_PRODUCERS_TO_AUDIT = 64;
public static final int MAX_AUDIT_DEPTH = 10000;
+ public static final String DUPLICATE_FROM_STORE_MSG_PREFIX = "duplicate from store for ";
protected final AtomicBoolean started = new AtomicBoolean();
protected final ActiveMQDestination destination;
@@ -881,16 +882,16 @@ public abstract class BaseDestination implements Destination {
}
@Override
- public void duplicateFromStore(Message message, Subscription durableSub) {
+ public void duplicateFromStore(Message message, Subscription subscription) {
ConnectionContext connectionContext = createConnectionContext();
- getLog().warn("duplicate message from store {}, redirecting for dlq processing", message.getMessageId());
- Throwable cause = new Throwable("duplicate from store for " + destination);
+ getLog().warn("{}{}, redirecting {} for dlq processing", DUPLICATE_FROM_STORE_MSG_PREFIX, destination, message.getMessageId());
+ Throwable cause = new Throwable(DUPLICATE_FROM_STORE_MSG_PREFIX + destination);
message.setRegionDestination(this);
broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1);
messageAck.setPoisonCause(cause);
try {
- acknowledge(connectionContext, durableSub, messageAck, message);
+ acknowledge(connectionContext, subscription, messageAck, message);
} catch (IOException e) {
getLog().error("Failed to acknowledge duplicate message {} from {} with {}", message.getMessageId(), destination, messageAck);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/52461512/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java b/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
index f270744..8f66890 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
@@ -38,6 +38,8 @@ import org.apache.activemq.state.ProducerState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.activemq.broker.region.BaseDestination.DUPLICATE_FROM_STORE_MSG_PREFIX;
+
/**
* Replace regular DLQ handling with redelivery via a resend to the original destination
* after a delay
@@ -128,8 +130,8 @@ public class RedeliveryPlugin extends BrokerPluginSupport {
@Override
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) {
- if (messageReference.isExpired()) {
- // there are two uses of sendToDeadLetterQueue, we are only interested in valid messages
+ if (messageReference.isExpired() || (poisonCause != null && poisonCause.getMessage() != null && poisonCause.getMessage().contains(DUPLICATE_FROM_STORE_MSG_PREFIX))) {
+ // there are three uses of sendToDeadLetterQueue, we are only interested in valid messages
return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
} else {
try {
http://git-wip-us.apache.org/repos/asf/activemq/blob/52461512/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
index 9971e8e..90e33ab 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
@@ -26,6 +26,8 @@ import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.util.RedeliveryPlugin;
@@ -129,6 +131,46 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data"));
}
+ public void testNoScheduledRedeliveryOfDuplicates() throws Exception {
+ broker = createBroker(true);
+
+ PolicyEntry policyEntry = new PolicyEntry();
+ policyEntry.setUseCache(false); // disable the cache such that duplicates are not suppressed on send
+
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.setDefaultEntry(policyEntry);
+ broker.setDestinationPolicy(policyMap);
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.start();
+
+ ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection();
+ consumerConnection.start();
+ Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(destination);
+
+ ActiveMQConnection producerConnection = (ActiveMQConnection) createConnection();
+ producerConnection.start();
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(destination);
+ Message message = producerSession.createMessage();
+ message.setStringProperty("data", data);
+ producer.send(message);
+
+ message = consumer.receive(1000);
+ assertNotNull("got message", message);
+ message.acknowledge();
+
+ // send it again
+ // should go to dlq as a duplicate from the store
+ producerConnection.getTransport().request(message);
+
+ // validate DLQ
+ MessageConsumer dlqConsumer = consumerSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
+ Message dlqMessage = dlqConsumer.receive(4000);
+ assertNotNull("Got message from dql", dlqMessage);
+ assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data"));
+ }
+
private void sendMessage(int timeToLive) throws Exception {
ActiveMQConnection producerConnection = (ActiveMQConnection) createConnection();
producerConnection.start();
@@ -144,8 +186,16 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
}
private void startBroker(boolean deleteMessages) throws Exception {
+ broker = createBroker(false);
+ if (deleteMessages) {
+ broker.setDeleteAllMessagesOnStartup(true);
+ }
+ broker.start();
+ }
+
+ private BrokerService createBroker(boolean persistent) throws Exception {
broker = new BrokerService();
- broker.setPersistent(false);
+ broker.setPersistent(persistent);
broker.setSchedulerSupport(true);
RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin();
@@ -160,11 +210,7 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
redeliveryPlugin.setRedeliveryPolicyMap(redeliveryPolicyMap);
broker.setPlugins(new BrokerPlugin[]{redeliveryPlugin});
-
- if (deleteMessages) {
- broker.setDeleteAllMessagesOnStartup(true);
- }
- broker.start();
+ return broker;
}
private void stopBroker() throws Exception {