You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/07/31 13:50:07 UTC
svn commit: r1508819 - in
/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq:
advisory/ broker/ broker/region/ broker/util/ plugin/
Author: gtully
Date: Wed Jul 31 11:50:06 2013
New Revision: 1508819
URL: http://svn.apache.org/r1508819
Log:
intermittent hang of test for https://issues.apache.org/jira/browse/AMQ-2021 showed concurrency flaw in https://issues.apache.org/jira/browse/AMQ-3236 - properties are only applied to the message copy that is deferred till needed
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Wed Jul 31 11:50:06 2013
@@ -417,8 +417,8 @@ public class AdvisoryBroker extends Brok
@Override
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
- Subscription subscription) {
- boolean wasDLQd = super.sendToDeadLetterQueue(context, messageReference, subscription);
+ Subscription subscription, Throwable poisonCause) {
+ boolean wasDLQd = super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
if (wasDLQd) {
try {
if(!messageReference.isAdvisory()) {
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java Wed Jul 31 11:50:06 2013
@@ -317,13 +317,13 @@ public interface Broker extends Region,
/**
* A message needs to go the a DLQ
*
+ *
* @param context
* @param messageReference
- * @param subscription, may be null
- *
+ * @param poisonCause reason for dlq submission, may be null
* @return true if Message was placed in a DLQ false if discarded.
*/
- boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription);
+ boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause);
/**
* @return the broker sequence id
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java Wed Jul 31 11:50:06 2013
@@ -309,8 +309,8 @@ public class BrokerFilter implements Bro
@Override
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
- Subscription subscription) {
- return next.sendToDeadLetterQueue(context, messageReference, subscription);
+ Subscription subscription, Throwable poisonCause) {
+ return next.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
}
@Override
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java Wed Jul 31 11:50:06 2013
@@ -302,7 +302,7 @@ public class EmptyBroker implements Brok
@Override
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
- Subscription subscription) {
+ Subscription subscription, Throwable poisonCause) {
return false;
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java Wed Jul 31 11:50:06 2013
@@ -312,7 +312,7 @@ public class ErrorBroker implements Brok
@Override
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
- Subscription subscription) {
+ Subscription subscription, Throwable poisonCause) {
throw new BrokerStoppedException(this.message);
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Wed Jul 31 11:50:06 2013
@@ -321,8 +321,8 @@ public class MutableBrokerFilter impleme
@Override
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
- Subscription subscription) {
- return getNext().sendToDeadLetterQueue(context, messageReference, subscription);
+ Subscription subscription, Throwable poisonCause) {
+ return getNext().sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
}
@Override
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Jul 31 11:50:06 2013
@@ -376,11 +376,7 @@ public abstract class PrefetchSubscripti
inAckRange = true;
}
if (inAckRange) {
- if (ack.getPoisonCause() != null) {
- node.getMessage().setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
- ack.getPoisonCause().toString());
- }
- sendToDLQ(context, node);
+ sendToDLQ(context, node, ack.getPoisonCause());
Destination nodeDest = (Destination) node.getRegionDestination();
nodeDest.getDestinationStatistics()
.getInflight().decrement();
@@ -501,13 +497,15 @@ public abstract class PrefetchSubscripti
}
/**
+ *
* @param context
* @param node
+ * @param poisonCause
* @throws IOException
* @throws Exception
*/
- protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
- broker.getRoot().sendToDeadLetterQueue(context, node, this);
+ protected void sendToDLQ(final ConnectionContext context, final MessageReference node, Throwable poisonCause) throws IOException, Exception {
+ broker.getRoot().sendToDeadLetterQueue(context, node, this, poisonCause);
}
@Override
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Wed Jul 31 11:50:06 2013
@@ -44,6 +44,7 @@ import org.apache.activemq.broker.Transp
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionId;
@@ -698,11 +699,11 @@ public class RegionBroker extends EmptyB
if (LOG.isDebugEnabled()) {
LOG.debug("Message expired " + node);
}
- getRoot().sendToDeadLetterQueue(context, node, subscription);
+ getRoot().sendToDeadLetterQueue(context, node, subscription, new Throwable("Message Expired. Expiration:" + node.getExpiration()));
}
@Override
- public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference node, Subscription subscription) {
+ public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference node, Subscription subscription, Throwable poisonCause) {
try {
if (node != null) {
Message message = node.getMessage();
@@ -718,6 +719,10 @@ public class RegionBroker extends EmptyB
message.setPersistent(true);
message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
}
+ if (poisonCause != null) {
+ message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
+ poisonCause.toString());
+ }
// The original destination and transaction id do
// not get filled when the message is first sent,
// it is only populated if the message is routed to
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Wed Jul 31 11:50:06 2013
@@ -630,7 +630,7 @@ public class TopicSubscription extends A
if (dest != null) {
dest.messageDiscarded(getContext(), this, message);
}
- broker.getRoot().sendToDeadLetterQueue(getContext(), message, this);
+ broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId()));
}
@Override
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java Wed Jul 31 11:50:06 2013
@@ -500,7 +500,7 @@ public class LoggingBrokerPlugin extends
@Override
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
- Subscription subscription) {
+ Subscription subscription, Throwable poisonCause) {
if (isLogAll() || isLogInternalEvents()) {
String msg = "Unable to display message.";
@@ -508,7 +508,7 @@ public class LoggingBrokerPlugin extends
LOG.info("Sending to DLQ : " + msg);
}
- return super.sendToDeadLetterQueue(context, messageReference, subscription);
+ return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
}
@Override
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java Wed Jul 31 11:50:06 2013
@@ -127,10 +127,10 @@ public class RedeliveryPlugin extends Br
}
@Override
- public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
+ public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) {
if (messageReference.isExpired()) {
// there are two uses of sendToDeadLetterQueue, we are only interested in valid messages
- return super.sendToDeadLetterQueue(context, messageReference, subscription);
+ return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
} else {
try {
Destination regionDestination = (Destination) messageReference.getRegionDestination();
@@ -146,12 +146,12 @@ public class RedeliveryPlugin extends Br
scheduleRedelivery(context, messageReference, delay, ++redeliveryCount);
} else if (isSendToDlqIfMaxRetriesExceeded()) {
- return super.sendToDeadLetterQueue(context, messageReference, subscription);
+ return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
} else {
LOG.debug("Discarding message that exceeds max redelivery count( " + maximumRedeliveries + "), " + messageReference.getMessageId());
}
} else if (isFallbackToDeadLetter()) {
- return super.sendToDeadLetterQueue(context, messageReference, subscription);
+ return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
} else {
LOG.debug("Ignoring dlq request for:" + messageReference.getMessageId() + ", RedeliveryPolicy not found (and no fallback) for: " + regionDestination.getActiveMQDestination());
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java Wed Jul 31 11:50:06 2013
@@ -44,7 +44,7 @@ public class DiscardingDLQBroker extends
}
@Override
- public boolean sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef, Subscription subscription) {
+ public boolean sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef, Subscription subscription, Throwable poisonCause) {
if (log.isTraceEnabled()) {
log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null));
}
@@ -73,7 +73,7 @@ public class DiscardingDLQBroker extends
skipMessage("dropOnly", msgRef);
} else {
dropped = false;
- return next.sendToDeadLetterQueue(ctx, msgRef, subscription);
+ return next.sendToDeadLetterQueue(ctx, msgRef, subscription, poisonCause);
}
if (dropped && getReportInterval() > 0) {