You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/07/31 13:50:07 UTC

svn commit: r1508819 - in /activemq/trunk/activemq-broker/src/main/java/org/apache/activemq: advisory/ broker/ broker/region/ broker/util/ plugin/

Author: gtully
Date: Wed Jul 31 11:50:06 2013
New Revision: 1508819

URL: http://svn.apache.org/r1508819
Log:
intermittent hang of test for https://issues.apache.org/jira/browse/AMQ-2021 showed concurrency flaw in https://issues.apache.org/jira/browse/AMQ-3236 - properties are only applied to the message copy that is deferred till needed

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Wed Jul 31 11:50:06 2013
@@ -417,8 +417,8 @@ public class AdvisoryBroker extends Brok
 
     @Override
     public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
-                                         Subscription subscription) {
-        boolean wasDLQd = super.sendToDeadLetterQueue(context, messageReference, subscription);
+                                         Subscription subscription, Throwable poisonCause) {
+        boolean wasDLQd = super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
         if (wasDLQd) {
             try {
                 if(!messageReference.isAdvisory()) {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java Wed Jul 31 11:50:06 2013
@@ -317,13 +317,13 @@ public interface Broker extends Region, 
     /**
      * A message needs to go the a DLQ
      *
+     *
      * @param context
      * @param messageReference
-     * @param subscription, may be null
-     *
+     * @param poisonCause reason for dlq submission, may be null
      * @return true if Message was placed in a DLQ false if discarded.
      */
-    boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription);
+    boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause);
 
     /**
      * @return the broker sequence id

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java Wed Jul 31 11:50:06 2013
@@ -309,8 +309,8 @@ public class BrokerFilter implements Bro
 
     @Override
     public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
-                                      Subscription subscription) {
-        return next.sendToDeadLetterQueue(context, messageReference, subscription);
+                                         Subscription subscription, Throwable poisonCause) {
+        return next.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
     }
 
     @Override

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java Wed Jul 31 11:50:06 2013
@@ -302,7 +302,7 @@ public class EmptyBroker implements Brok
 
     @Override
     public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
-                                         Subscription subscription) {
+                                         Subscription subscription, Throwable poisonCause) {
         return false;
     }
 

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java Wed Jul 31 11:50:06 2013
@@ -312,7 +312,7 @@ public class ErrorBroker implements Brok
 
     @Override
     public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
-                                         Subscription subscription) {
+                                         Subscription subscription, Throwable poisonCause) {
         throw new BrokerStoppedException(this.message);
     }
 

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Wed Jul 31 11:50:06 2013
@@ -321,8 +321,8 @@ public class MutableBrokerFilter impleme
 
     @Override
     public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
-                                         Subscription subscription) {
-        return getNext().sendToDeadLetterQueue(context, messageReference, subscription);
+                                         Subscription subscription, Throwable poisonCause) {
+        return getNext().sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
     }
 
     @Override

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Jul 31 11:50:06 2013
@@ -376,11 +376,7 @@ public abstract class PrefetchSubscripti
                         inAckRange = true;
                     }
                     if (inAckRange) {
-                        if (ack.getPoisonCause() != null) {
-                            node.getMessage().setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
-                                    ack.getPoisonCause().toString());
-                        }
-                        sendToDLQ(context, node);
+                        sendToDLQ(context, node, ack.getPoisonCause());
                         Destination nodeDest = (Destination) node.getRegionDestination();
                         nodeDest.getDestinationStatistics()
                                 .getInflight().decrement();
@@ -501,13 +497,15 @@ public abstract class PrefetchSubscripti
     }
 
     /**
+     *
      * @param context
      * @param node
+     * @param poisonCause
      * @throws IOException
      * @throws Exception
      */
-    protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
-        broker.getRoot().sendToDeadLetterQueue(context, node, this);
+    protected void sendToDLQ(final ConnectionContext context, final MessageReference node, Throwable poisonCause) throws IOException, Exception {
+        broker.getRoot().sendToDeadLetterQueue(context, node, this, poisonCause);
     }
 
     @Override

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Wed Jul 31 11:50:06 2013
@@ -44,6 +44,7 @@ import org.apache.activemq.broker.Transp
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionId;
@@ -698,11 +699,11 @@ public class RegionBroker extends EmptyB
         if (LOG.isDebugEnabled()) {
             LOG.debug("Message expired " + node);
         }
-        getRoot().sendToDeadLetterQueue(context, node, subscription);
+        getRoot().sendToDeadLetterQueue(context, node, subscription, new Throwable("Message Expired. Expiration:" + node.getExpiration()));
     }
 
     @Override
-    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference node, Subscription subscription) {
+    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference node, Subscription subscription, Throwable poisonCause) {
         try {
             if (node != null) {
                 Message message = node.getMessage();
@@ -718,6 +719,10 @@ public class RegionBroker extends EmptyB
                                 message.setPersistent(true);
                                 message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
                             }
+                            if (poisonCause != null) {
+                                message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
+                                        poisonCause.toString());
+                            }
                             // The original destination and transaction id do
                             // not get filled when the message is first sent,
                             // it is only populated if the message is routed to

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Wed Jul 31 11:50:06 2013
@@ -630,7 +630,7 @@ public class TopicSubscription extends A
         if (dest != null) {
             dest.messageDiscarded(getContext(), this, message);
         }
-        broker.getRoot().sendToDeadLetterQueue(getContext(), message, this);
+        broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId()));
     }
 
     @Override

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java Wed Jul 31 11:50:06 2013
@@ -500,7 +500,7 @@ public class LoggingBrokerPlugin extends
 
     @Override
     public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
-                                      Subscription subscription) {
+                                         Subscription subscription, Throwable poisonCause) {
         if (isLogAll() || isLogInternalEvents()) {
             String msg = "Unable to display message.";
 
@@ -508,7 +508,7 @@ public class LoggingBrokerPlugin extends
 
             LOG.info("Sending to DLQ : " + msg);
         }
-        return super.sendToDeadLetterQueue(context, messageReference, subscription);
+        return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
     }
 
     @Override

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/RedeliveryPlugin.java Wed Jul 31 11:50:06 2013
@@ -127,10 +127,10 @@ public class RedeliveryPlugin extends Br
     }
 
     @Override
-    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
+    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) {
         if (messageReference.isExpired()) {
             // there are two uses of  sendToDeadLetterQueue, we are only interested in valid messages
-            return super.sendToDeadLetterQueue(context, messageReference, subscription);
+            return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
         } else {
             try {
                 Destination regionDestination = (Destination) messageReference.getRegionDestination();
@@ -146,12 +146,12 @@ public class RedeliveryPlugin extends Br
 
                         scheduleRedelivery(context, messageReference, delay, ++redeliveryCount);
                     } else if (isSendToDlqIfMaxRetriesExceeded()) {
-                        return super.sendToDeadLetterQueue(context, messageReference, subscription);
+                        return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
                     } else {
                         LOG.debug("Discarding message that exceeds max redelivery count( " + maximumRedeliveries + "), " + messageReference.getMessageId());
                     }
                 } else if (isFallbackToDeadLetter()) {
-                    return super.sendToDeadLetterQueue(context, messageReference, subscription);
+                    return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
                 } else {
                     LOG.debug("Ignoring dlq request for:" + messageReference.getMessageId() + ", RedeliveryPolicy not found (and no fallback) for: " + regionDestination.getActiveMQDestination());
                 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java?rev=1508819&r1=1508818&r2=1508819&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java Wed Jul 31 11:50:06 2013
@@ -44,7 +44,7 @@ public class DiscardingDLQBroker extends
     }
 
     @Override
-    public boolean sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef, Subscription subscription) {
+    public boolean sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef, Subscription subscription, Throwable poisonCause) {
         if (log.isTraceEnabled()) {
             log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null));
         }
@@ -73,7 +73,7 @@ public class DiscardingDLQBroker extends
             skipMessage("dropOnly", msgRef);
         } else {
             dropped = false;
-            return next.sendToDeadLetterQueue(ctx, msgRef, subscription);
+            return next.sendToDeadLetterQueue(ctx, msgRef, subscription, poisonCause);
         }
 
         if (dropped && getReportInterval() > 0) {