You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/06/21 09:57:32 UTC

svn commit: r956481 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: advisory/AdvisoryBroker.java advisory/AdvisorySupport.java broker/region/PrefetchSubscription.java

Author: rajdavies
Date: Mon Jun 21 07:57:32 2010
New Revision: 956481

URL: http://svn.apache.org/viewvc?rev=956481&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2786

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=956481&r1=956480&r2=956481&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Mon Jun 21 07:57:32 2010
@@ -386,6 +386,21 @@ public class AdvisoryBroker extends Brok
             LOG.warn("Failed to fire message master broker advisory");
         }
     }
+    
+    @Override
+    public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){
+        super.sendToDeadLetterQueue(context, messageReference);
+        try {
+            if(!messageReference.isAdvisory()) {
+                ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());
+                Message payload = messageReference.getMessage().copy();
+                payload.clearBody();
+                fireAdvisory(context, topic,payload);
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to fire message consumed advisory");
+        } 
+    }
 
     protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
         fireAdvisory(context, topic, command, null);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=956481&r1=956480&r2=956481&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java Mon Jun 21 07:57:32 2010
@@ -16,11 +16,11 @@
  */
 package org.apache.activemq.advisory;
 
+import javax.jms.Destination;
+import javax.jms.JMSException;
 import org.apache.activemq.ActiveMQMessageTransformation;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
-import javax.jms.Destination;
-import javax.jms.JMSException;
 public final class AdvisorySupport {
     public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
     public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX
@@ -45,6 +45,7 @@ public final class AdvisorySupport {
     public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
     public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
     public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
+    public static final String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd.";
     public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
     public static final String AGENT_TOPIC = "ActiveMQ.Agent";
     public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
@@ -185,6 +186,12 @@ public final class AdvisorySupport {
                 + destination.getPhysicalName();
         return new ActiveMQTopic(name);
     }
+    
+    public static ActiveMQTopic getMessageDLQdAdvisoryTopic(ActiveMQDestination destination) {
+        String name = MESSAGE_DLQ_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
+                + destination.getPhysicalName();
+        return new ActiveMQTopic(name);
+    }
 
     public static ActiveMQTopic getMasterBrokerAdvisoryTopic(Destination destination) throws JMSException {
         return getMasterBrokerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=956481&r1=956480&r2=956481&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Jun 21 07:57:32 2010
@@ -457,7 +457,7 @@ public abstract class PrefetchSubscripti
      * @throws Exception
      */
     protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
-        broker.sendToDeadLetterQueue(context, node);
+        broker.getRoot().sendToDeadLetterQueue(context, node);
     }
     
     public int getInFlightSize() {