You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2021/12/20 14:38:36 UTC

[activemq] branch main updated: [AMQ-8397] Add a destination option to sendDuplicateFromStoreToDLQ (#724)

This is an automated email from the ASF dual-hosted git repository.

mattrpav pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 05426d6  [AMQ-8397] Add a destination option to sendDuplicateFromStoreToDLQ (#724)
05426d6 is described below

commit 05426d637cf00e0bc289df6e0cb3a050fc606835
Author: Matt Pavlovich <ma...@hyte.io>
AuthorDate: Mon Dec 20 08:38:32 2021 -0600

    [AMQ-8397] Add a destination option to sendDuplicateFromStoreToDLQ (#724)
    
    - Default 'true' to match existing behavior
     - Added counter to DestinationView
---
 .../apache/activemq/broker/jmx/DestinationView.java  |  9 +++++++++
 .../activemq/broker/jmx/DestinationViewMBean.java    | 20 ++++++++++++++++++++
 .../activemq/broker/region/BaseDestination.java      | 14 +++++++++++++-
 .../apache/activemq/broker/region/Destination.java   |  4 ++++
 .../activemq/broker/region/DestinationFilter.java    | 10 ++++++++++
 .../broker/region/DestinationStatistics.java         | 11 +++++++++++
 .../activemq/broker/region/policy/PolicyEntry.java   | 17 ++++++++++++++++-
 7 files changed, 83 insertions(+), 2 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
index d795c4d..db0c67f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
@@ -103,6 +103,11 @@ public class DestinationView implements DestinationViewMBean {
     }
 
     @Override
+    public long getDuplicateFromStoreCount() {
+        return destination.getDestinationStatistics().getDuplicateFromStore().getCount();
+    }
+
+    @Override
     public long getInFlightCount() {
         return destination.getDestinationStatistics().getInflight().getCount();
     }
@@ -570,4 +575,8 @@ public class DestinationView implements DestinationViewMBean {
         return destination.getDestinationStatistics().getBlockedTime().getTotalTime();
     }
 
+    @Override
+    public boolean isSendDuplicateFromStoreToDLQ() {
+        return destination.isSendDuplicateFromStoreToDLQ();
+    }
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
index ad0ae32..526e648 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
@@ -70,6 +70,26 @@ public interface DestinationViewMBean {
     long getDequeueCount();
 
     /**
+     * Returns the number of duplicate messages that have been paged-in 
+     * from the store.
+     *
+     * @return The number of duplicate messages that have been paged-in 
+     *         from the store.
+     */
+    @MBeanInfo("Number of duplicate messages that have been paged-in from the store.")
+    long getDuplicateFromStoreCount();
+
+    /**
+     * Returns the config setting to send a duplicate message from store 
+     * to the dead letter queue.
+     *
+     * @return The config setting to send a duplicate message from store 
+     *         to the dead letter queue.
+     */
+    @MBeanInfo("Config setting to send a duplicate from store message to the dead letter queue.")
+    boolean isSendDuplicateFromStoreToDLQ();
+
+    /**
      * Returns the number of messages that have been acknowledged by network subscriptions from the
      * destination.
      *
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 6d86162..1b46c16 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -88,6 +88,7 @@ public abstract class BaseDestination implements Destination {
     private boolean advisoryForDelivery;
     private boolean advisoryForConsumed;
     private boolean sendAdvisoryIfNoConsumers;
+    private boolean sendDuplicateFromStoreToDLQ = true;
     private boolean includeBodyForAdvisory;
     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
     protected final BrokerService brokerService;
@@ -477,6 +478,14 @@ public abstract class BaseDestination implements Destination {
         this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
     }
 
+    public boolean isSendDuplicateFromStoreToDLQ() {
+        return this.sendDuplicateFromStoreToDLQ;
+    }
+
+    public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ) {
+        this.sendDuplicateFromStoreToDLQ = sendDuplicateFromStoreToDLQ;
+    }
+
     public boolean isIncludeBodyForAdvisory() {
         return includeBodyForAdvisory;
     }
@@ -889,11 +898,14 @@ public abstract class BaseDestination implements Destination {
 
     @Override
     public void duplicateFromStore(Message message, Subscription subscription) {
+        destinationStatistics.getDuplicateFromStore().increment();
         ConnectionContext connectionContext = createConnectionContext();
         getLog().warn("{}{}, redirecting {} for dlq processing", DUPLICATE_FROM_STORE_MSG_PREFIX, destination, message.getMessageId());
         Throwable cause = new Throwable(DUPLICATE_FROM_STORE_MSG_PREFIX + destination);
         message.setRegionDestination(this);
-        broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
+        if(this.isSendDuplicateFromStoreToDLQ()) {
+            broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
+        }
         MessageAck messageAck = new MessageAck(message, MessageAck.POISON_ACK_TYPE, 1);
         messageAck.setPoisonCause(cause);
         try {
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
index 031015e..180cf25 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
@@ -245,4 +245,8 @@ public interface Destination extends Service, Task, Message.MessageDestination {
     public void clearPendingMessages(int pendingAdditionsCount);
 
     void duplicateFromStore(Message message, Subscription subscription);
+
+    boolean isSendDuplicateFromStoreToDLQ();
+
+    void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ);
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
index 0d0db05..2c11bc3 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
@@ -394,6 +394,16 @@ public class DestinationFilter implements Destination {
         next.duplicateFromStore(message, subscription);
     }
 
+    @Override
+    public boolean isSendDuplicateFromStoreToDLQ() {
+        return next.isSendDuplicateFromStoreToDLQ();
+    }
+
+    @Override
+    public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ) {
+        next.setSendDuplicateFromStoreToDLQ(sendDuplicateFromStoreToDLQ);
+    }
+
     public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
         if (next instanceof DestinationFilter) {
             DestinationFilter filter = (DestinationFilter) next;
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
index 0a9176e..88dd911 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
@@ -37,6 +37,7 @@ public class DestinationStatistics extends StatsImpl {
     protected CountStatisticImpl messages;
     protected PollCountStatisticImpl messagesCached;
     protected CountStatisticImpl dispatched;
+    protected CountStatisticImpl duplicateFromStore;
     protected CountStatisticImpl inflight;
     protected CountStatisticImpl expired;
     protected TimeStatisticImpl processTime;
@@ -50,6 +51,7 @@ public class DestinationStatistics extends StatsImpl {
         enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
         dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the destination");
         dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the destination");
+        duplicateFromStore = new CountStatisticImpl("duplicateFromStore", "The number of duplicate messages that have been paged-in from the store for this destination");
         forwards = new CountStatisticImpl("forwards", "The number of messages that have been forwarded to a networked broker from the destination");
         inflight = new CountStatisticImpl("inflight", "The number of messages dispatched but awaiting acknowledgement");
         expired = new CountStatisticImpl("expired", "The number of messages that have expired");
@@ -68,6 +70,7 @@ public class DestinationStatistics extends StatsImpl {
         addStatistic("enqueues", enqueues);
         addStatistic("dispatched", dispatched);
         addStatistic("dequeues", dequeues);
+        addStatistic("duplicateFromStore", duplicateFromStore);
         addStatistic("inflight", inflight);
         addStatistic("expired", expired);
         addStatistic("consumers", consumers);
@@ -124,6 +127,10 @@ public class DestinationStatistics extends StatsImpl {
         return dispatched;
     }
 
+    public CountStatisticImpl getDuplicateFromStore() {
+        return duplicateFromStore;
+    }
+
     public TimeStatisticImpl getProcessTime() {
         return this.processTime;
     }
@@ -145,6 +152,7 @@ public class DestinationStatistics extends StatsImpl {
             dequeues.reset();
             forwards.reset();
             dispatched.reset();
+            duplicateFromStore.reset();
             inflight.reset();
             expired.reset();
             blockedSends.reset();
@@ -158,6 +166,7 @@ public class DestinationStatistics extends StatsImpl {
         enqueues.setEnabled(enabled);
         dispatched.setEnabled(enabled);
         dequeues.setEnabled(enabled);
+        duplicateFromStore.setEnabled(enabled);
         forwards.setEnabled(enabled);
         inflight.setEnabled(enabled);
         expired.setEnabled(true);
@@ -177,6 +186,7 @@ public class DestinationStatistics extends StatsImpl {
             enqueues.setParent(parent.enqueues);
             dispatched.setParent(parent.dispatched);
             dequeues.setParent(parent.dequeues);
+            duplicateFromStore.setParent(parent.duplicateFromStore);
             forwards.setParent(parent.forwards);
             inflight.setParent(parent.inflight);
             expired.setParent(parent.expired);
@@ -192,6 +202,7 @@ public class DestinationStatistics extends StatsImpl {
             enqueues.setParent(null);
             dispatched.setParent(null);
             dequeues.setParent(null);
+            duplicateFromStore.setParent(null);
             forwards.setParent(null);
             inflight.setParent(null);
             expired.setParent(null);
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index ab869b0..862d5dd 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -51,6 +51,7 @@ public class PolicyEntry extends DestinationMapEntry {
     private DispatchPolicy dispatchPolicy;
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
     private boolean sendAdvisoryIfNoConsumers;
+    private boolean sendDuplicateFromStoreToDLQ = true;
     private DeadLetterStrategy deadLetterStrategy = Destination.DEFAULT_DEAD_LETTER_STRATEGY;
     private PendingMessageLimitStrategy pendingMessageLimitStrategy;
     private MessageEvictionStrategy messageEvictionStrategy;
@@ -241,7 +242,6 @@ public class PolicyEntry extends DestinationMapEntry {
         if (isUpdate("maxBrowsePageSize", includedProperties)) {
             destination.setMaxBrowsePageSize(getMaxBrowsePageSize());
         }
-
         if (isUpdate("minimumMessageSize", includedProperties)) {
             destination.setMinimumMessageSize((int) getMinimumMessageSize());
         }
@@ -296,6 +296,9 @@ public class PolicyEntry extends DestinationMapEntry {
         if (isUpdate("sendAdvisoryIfNoConsumers", includedProperties)) {
             destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
         }
+        if (isUpdate("sendDuplicateFromStoreToDLQ", includedProperties)) {
+            destination.setSendDuplicateFromStoreToDLQ(isSendDuplicateFromStoreToDLQ());
+        }
     }
 
     public void baseConfiguration(Broker broker, BaseDestination destination) {
@@ -456,6 +459,18 @@ public class PolicyEntry extends DestinationMapEntry {
         this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
     }
 
+    public boolean isSendDuplicateFromStoreToDLQ() {
+        return sendDuplicateFromStoreToDLQ;
+    }
+
+    /**
+     * Sends a copy of message to DLQ if a duplicate messages are paged-in from
+     * the messages store
+     */
+    public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ) {
+        this.sendDuplicateFromStoreToDLQ = sendDuplicateFromStoreToDLQ;
+    }
+
     public DeadLetterStrategy getDeadLetterStrategy() {
         return deadLetterStrategy;
     }