You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/15 01:27:26 UTC

svn commit: r1350425 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java

Author: tabish
Date: Thu Jun 14 23:27:25 2012
New Revision: 1350425

URL: http://svn.apache.org/viewvc?rev=1350425&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3882

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1350425&r1=1350424&r2=1350425&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Thu Jun 14 23:27:25 2012
@@ -20,6 +20,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
 import org.apache.activemq.broker.ConnectionContext;
@@ -28,7 +29,20 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicSubscription;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.usage.Usage;
@@ -336,10 +350,12 @@ public class AdvisoryBroker extends Brok
     public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
         super.slowConsumer(context, destination,subs);
         try {
-            ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
-            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
-            advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString());
-            fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage);
+            if (!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
+                ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
+                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString());
+                fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage);
+            }
         } catch (Exception e) {
             handleFireFailure("slow consumer", e);
         }
@@ -349,10 +365,12 @@ public class AdvisoryBroker extends Brok
     public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
         super.fastProducer(context, producerInfo);
         try {
-            ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination());
-            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
-            advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString());
-            fireAdvisory(context, topic, producerInfo, null, advisoryMessage);
+            if (!AdvisorySupport.isAdvisoryTopic(producerInfo.getDestination())) {
+                ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination());
+                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString());
+                fireAdvisory(context, topic, producerInfo, null, advisoryMessage);
+            }
         } catch (Exception e) {
             handleFireFailure("fast producer", e);
         }