You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2021/12/20 14:38:36 UTC
[activemq] branch main updated: [AMQ-8397] Add a destination option to sendDuplicateFromStoreToDLQ (#724)
This is an automated email from the ASF dual-hosted git repository.
mattrpav pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 05426d6 [AMQ-8397] Add a destination option to sendDuplicateFromStoreToDLQ (#724)
05426d6 is described below
commit 05426d637cf00e0bc289df6e0cb3a050fc606835
Author: Matt Pavlovich <ma...@hyte.io>
AuthorDate: Mon Dec 20 08:38:32 2021 -0600
[AMQ-8397] Add a destination option to sendDuplicateFromStoreToDLQ (#724)
- Default 'true' to match existing behavior
- Added counter to DestinationView
---
.../apache/activemq/broker/jmx/DestinationView.java | 9 +++++++++
.../activemq/broker/jmx/DestinationViewMBean.java | 20 ++++++++++++++++++++
.../activemq/broker/region/BaseDestination.java | 14 +++++++++++++-
.../apache/activemq/broker/region/Destination.java | 4 ++++
.../activemq/broker/region/DestinationFilter.java | 10 ++++++++++
.../broker/region/DestinationStatistics.java | 11 +++++++++++
.../activemq/broker/region/policy/PolicyEntry.java | 17 ++++++++++++++++-
7 files changed, 83 insertions(+), 2 deletions(-)
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
index d795c4d..db0c67f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
@@ -103,6 +103,11 @@ public class DestinationView implements DestinationViewMBean {
}
@Override
+ public long getDuplicateFromStoreCount() {
+ return destination.getDestinationStatistics().getDuplicateFromStore().getCount();
+ }
+
+ @Override
public long getInFlightCount() {
return destination.getDestinationStatistics().getInflight().getCount();
}
@@ -570,4 +575,8 @@ public class DestinationView implements DestinationViewMBean {
return destination.getDestinationStatistics().getBlockedTime().getTotalTime();
}
+ @Override
+ public boolean isSendDuplicateFromStoreToDLQ() {
+ return destination.isSendDuplicateFromStoreToDLQ();
+ }
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
index ad0ae32..526e648 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
@@ -70,6 +70,26 @@ public interface DestinationViewMBean {
long getDequeueCount();
/**
+ * Returns the number of duplicate messages that have been paged-in
+ * from the store.
+ *
+ * @return The number of duplicate messages that have been paged-in
+ * from the store.
+ */
+ @MBeanInfo("Number of duplicate messages that have been paged-in from the store.")
+ long getDuplicateFromStoreCount();
+
+ /**
+ * Returns the config setting to send a duplicate message from store
+ * to the dead letter queue.
+ *
+ * @return The config setting to send a duplicate message from store
+ * to the dead letter queue.
+ */
+ @MBeanInfo("Config setting to send a duplicate from store message to the dead letter queue.")
+ boolean isSendDuplicateFromStoreToDLQ();
+
+ /**
* Returns the number of messages that have been acknowledged by network subscriptions from the
* destination.
*
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 6d86162..1b46c16 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -88,6 +88,7 @@ public abstract class BaseDestination implements Destination {
private boolean advisoryForDelivery;
private boolean advisoryForConsumed;
private boolean sendAdvisoryIfNoConsumers;
+ private boolean sendDuplicateFromStoreToDLQ = true;
private boolean includeBodyForAdvisory;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
protected final BrokerService brokerService;
@@ -477,6 +478,14 @@ public abstract class BaseDestination implements Destination {
this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
}
+ public boolean isSendDuplicateFromStoreToDLQ() {
+ return this.sendDuplicateFromStoreToDLQ;
+ }
+
+ public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ) {
+ this.sendDuplicateFromStoreToDLQ = sendDuplicateFromStoreToDLQ;
+ }
+
public boolean isIncludeBodyForAdvisory() {
return includeBodyForAdvisory;
}
@@ -889,11 +898,14 @@ public abstract class BaseDestination implements Destination {
@Override
public void duplicateFromStore(Message message, Subscription subscription) {
+ destinationStatistics.getDuplicateFromStore().increment();
ConnectionContext connectionContext = createConnectionContext();
getLog().warn("{}{}, redirecting {} for dlq processing", DUPLICATE_FROM_STORE_MSG_PREFIX, destination, message.getMessageId());
Throwable cause = new Throwable(DUPLICATE_FROM_STORE_MSG_PREFIX + destination);
message.setRegionDestination(this);
- broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
+ if(this.isSendDuplicateFromStoreToDLQ()) {
+ broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
+ }
MessageAck messageAck = new MessageAck(message, MessageAck.POISON_ACK_TYPE, 1);
messageAck.setPoisonCause(cause);
try {
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
index 031015e..180cf25 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
@@ -245,4 +245,8 @@ public interface Destination extends Service, Task, Message.MessageDestination {
public void clearPendingMessages(int pendingAdditionsCount);
void duplicateFromStore(Message message, Subscription subscription);
+
+ boolean isSendDuplicateFromStoreToDLQ();
+
+ void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ);
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
index 0d0db05..2c11bc3 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
@@ -394,6 +394,16 @@ public class DestinationFilter implements Destination {
next.duplicateFromStore(message, subscription);
}
+ @Override
+ public boolean isSendDuplicateFromStoreToDLQ() {
+ return next.isSendDuplicateFromStoreToDLQ();
+ }
+
+ @Override
+ public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ) {
+ next.setSendDuplicateFromStoreToDLQ(sendDuplicateFromStoreToDLQ);
+ }
+
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
if (next instanceof DestinationFilter) {
DestinationFilter filter = (DestinationFilter) next;
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
index 0a9176e..88dd911 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
@@ -37,6 +37,7 @@ public class DestinationStatistics extends StatsImpl {
protected CountStatisticImpl messages;
protected PollCountStatisticImpl messagesCached;
protected CountStatisticImpl dispatched;
+ protected CountStatisticImpl duplicateFromStore;
protected CountStatisticImpl inflight;
protected CountStatisticImpl expired;
protected TimeStatisticImpl processTime;
@@ -50,6 +51,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the destination");
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the destination");
+ duplicateFromStore = new CountStatisticImpl("duplicateFromStore", "The number of duplicate messages that have been paged-in from the store for this destination");
forwards = new CountStatisticImpl("forwards", "The number of messages that have been forwarded to a networked broker from the destination");
inflight = new CountStatisticImpl("inflight", "The number of messages dispatched but awaiting acknowledgement");
expired = new CountStatisticImpl("expired", "The number of messages that have expired");
@@ -68,6 +70,7 @@ public class DestinationStatistics extends StatsImpl {
addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
+ addStatistic("duplicateFromStore", duplicateFromStore);
addStatistic("inflight", inflight);
addStatistic("expired", expired);
addStatistic("consumers", consumers);
@@ -124,6 +127,10 @@ public class DestinationStatistics extends StatsImpl {
return dispatched;
}
+ public CountStatisticImpl getDuplicateFromStore() {
+ return duplicateFromStore;
+ }
+
public TimeStatisticImpl getProcessTime() {
return this.processTime;
}
@@ -145,6 +152,7 @@ public class DestinationStatistics extends StatsImpl {
dequeues.reset();
forwards.reset();
dispatched.reset();
+ duplicateFromStore.reset();
inflight.reset();
expired.reset();
blockedSends.reset();
@@ -158,6 +166,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues.setEnabled(enabled);
dispatched.setEnabled(enabled);
dequeues.setEnabled(enabled);
+ duplicateFromStore.setEnabled(enabled);
forwards.setEnabled(enabled);
inflight.setEnabled(enabled);
expired.setEnabled(true);
@@ -177,6 +186,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues.setParent(parent.enqueues);
dispatched.setParent(parent.dispatched);
dequeues.setParent(parent.dequeues);
+ duplicateFromStore.setParent(parent.duplicateFromStore);
forwards.setParent(parent.forwards);
inflight.setParent(parent.inflight);
expired.setParent(parent.expired);
@@ -192,6 +202,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues.setParent(null);
dispatched.setParent(null);
dequeues.setParent(null);
+ duplicateFromStore.setParent(null);
forwards.setParent(null);
inflight.setParent(null);
expired.setParent(null);
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index ab869b0..862d5dd 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -51,6 +51,7 @@ public class PolicyEntry extends DestinationMapEntry {
private DispatchPolicy dispatchPolicy;
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private boolean sendAdvisoryIfNoConsumers;
+ private boolean sendDuplicateFromStoreToDLQ = true;
private DeadLetterStrategy deadLetterStrategy = Destination.DEFAULT_DEAD_LETTER_STRATEGY;
private PendingMessageLimitStrategy pendingMessageLimitStrategy;
private MessageEvictionStrategy messageEvictionStrategy;
@@ -241,7 +242,6 @@ public class PolicyEntry extends DestinationMapEntry {
if (isUpdate("maxBrowsePageSize", includedProperties)) {
destination.setMaxBrowsePageSize(getMaxBrowsePageSize());
}
-
if (isUpdate("minimumMessageSize", includedProperties)) {
destination.setMinimumMessageSize((int) getMinimumMessageSize());
}
@@ -296,6 +296,9 @@ public class PolicyEntry extends DestinationMapEntry {
if (isUpdate("sendAdvisoryIfNoConsumers", includedProperties)) {
destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
}
+ if (isUpdate("sendDuplicateFromStoreToDLQ", includedProperties)) {
+ destination.setSendDuplicateFromStoreToDLQ(isSendDuplicateFromStoreToDLQ());
+ }
}
public void baseConfiguration(Broker broker, BaseDestination destination) {
@@ -456,6 +459,18 @@ public class PolicyEntry extends DestinationMapEntry {
this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
}
+ public boolean isSendDuplicateFromStoreToDLQ() {
+ return sendDuplicateFromStoreToDLQ;
+ }
+
+ /**
+ * Sends a copy of message to DLQ if a duplicate messages are paged-in from
+ * the messages store
+ */
+ public void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ) {
+ this.sendDuplicateFromStoreToDLQ = sendDuplicateFromStoreToDLQ;
+ }
+
public DeadLetterStrategy getDeadLetterStrategy() {
return deadLetterStrategy;
}