You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/15 01:27:26 UTC
svn commit: r1350425 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Author: tabish
Date: Thu Jun 14 23:27:25 2012
New Revision: 1350425
URL: http://svn.apache.org/viewvc?rev=1350425&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3882
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1350425&r1=1350424&r2=1350425&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Thu Jun 14 23:27:25 2012
@@ -20,6 +20,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
@@ -28,7 +29,20 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.Usage;
@@ -336,10 +350,12 @@ public class AdvisoryBroker extends Brok
public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
super.slowConsumer(context, destination,subs);
try {
- ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
- ActiveMQMessage advisoryMessage = new ActiveMQMessage();
- advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString());
- fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage);
+ if (!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
+ ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
+ ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+ advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString());
+ fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage);
+ }
} catch (Exception e) {
handleFireFailure("slow consumer", e);
}
@@ -349,10 +365,12 @@ public class AdvisoryBroker extends Brok
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
super.fastProducer(context, producerInfo);
try {
- ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination());
- ActiveMQMessage advisoryMessage = new ActiveMQMessage();
- advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString());
- fireAdvisory(context, topic, producerInfo, null, advisoryMessage);
+ if (!AdvisorySupport.isAdvisoryTopic(producerInfo.getDestination())) {
+ ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination());
+ ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+ advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString());
+ fireAdvisory(context, topic, producerInfo, null, advisoryMessage);
+ }
} catch (Exception e) {
handleFireFailure("fast producer", e);
}