You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/06/21 09:57:32 UTC
svn commit: r956481 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
advisory/AdvisoryBroker.java advisory/AdvisorySupport.java
broker/region/PrefetchSubscription.java
Author: rajdavies
Date: Mon Jun 21 07:57:32 2010
New Revision: 956481
URL: http://svn.apache.org/viewvc?rev=956481&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2786
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=956481&r1=956480&r2=956481&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Mon Jun 21 07:57:32 2010
@@ -386,6 +386,21 @@ public class AdvisoryBroker extends Brok
LOG.warn("Failed to fire message master broker advisory");
}
}
+
+ @Override
+ public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){
+ super.sendToDeadLetterQueue(context, messageReference);
+ try {
+ if(!messageReference.isAdvisory()) {
+ ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());
+ Message payload = messageReference.getMessage().copy();
+ payload.clearBody();
+ fireAdvisory(context, topic,payload);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to fire message consumed advisory");
+ }
+ }
protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
fireAdvisory(context, topic, command, null);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=956481&r1=956480&r2=956481&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java Mon Jun 21 07:57:32 2010
@@ -16,11 +16,11 @@
*/
package org.apache.activemq.advisory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
import org.apache.activemq.ActiveMQMessageTransformation;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
-import javax.jms.Destination;
-import javax.jms.JMSException;
public final class AdvisorySupport {
public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX
@@ -45,6 +45,7 @@ public final class AdvisorySupport {
public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
+ public static final String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd.";
public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
public static final String AGENT_TOPIC = "ActiveMQ.Agent";
public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
@@ -185,6 +186,12 @@ public final class AdvisorySupport {
+ destination.getPhysicalName();
return new ActiveMQTopic(name);
}
+
+ public static ActiveMQTopic getMessageDLQdAdvisoryTopic(ActiveMQDestination destination) {
+ String name = MESSAGE_DLQ_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
+ + destination.getPhysicalName();
+ return new ActiveMQTopic(name);
+ }
public static ActiveMQTopic getMasterBrokerAdvisoryTopic(Destination destination) throws JMSException {
return getMasterBrokerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=956481&r1=956480&r2=956481&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Jun 21 07:57:32 2010
@@ -457,7 +457,7 @@ public abstract class PrefetchSubscripti
* @throws Exception
*/
protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
- broker.sendToDeadLetterQueue(context, node);
+ broker.getRoot().sendToDeadLetterQueue(context, node);
}
public int getInFlightSize() {