You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/11/28 13:57:00 UTC

git commit: Added Fix and tests for https://issues.apache.org/jira/browse/AMQ-4906

Updated Branches:
  refs/heads/trunk e3fed4b57 -> 11781d3cf


Added Fix and tests for https://issues.apache.org/jira/browse/AMQ-4906


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/11781d3c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/11781d3c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/11781d3c

Branch: refs/heads/trunk
Commit: 11781d3cf2a351ea5f3ef80f29f351ac42d3538e
Parents: e3fed4b
Author: rajdavies <ra...@gmail.com>
Authored: Thu Nov 28 12:56:11 2013 +0000
Committer: rajdavies <ra...@gmail.com>
Committed: Thu Nov 28 12:56:11 2013 +0000

----------------------------------------------------------------------
 .../activemq/advisory/AdvisoryBroker.java       | 142 ++++++++-----------
 .../activemq/advisory/ConsumerListenerTest.java |  14 ++
 .../activemq/advisory/ProducerListenerTest.java |  35 +++--
 3 files changed, 102 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/11781d3c/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index d68c5bd..5c90287 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -16,14 +16,12 @@
  */
 package org.apache.activemq.advisory;
 
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentSkipListMap;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
@@ -36,21 +34,7 @@ import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.broker.region.TopicSubscription;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.*;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.usage.Usage;
@@ -72,7 +56,7 @@ public class AdvisoryBroker extends BrokerFilter {
     protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
 
     protected final Queue<ConsumerInfo> consumers = new ConcurrentLinkedQueue<ConsumerInfo>();
-    
+
     protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
     protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
     protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
@@ -111,7 +95,7 @@ public class AdvisoryBroker extends BrokerFilter {
             // for this newly added consumer.
             if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) {
                 // Replay the connections.
-                for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext();) {
+                for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext(); ) {
                     ConnectionInfo value = iter.next();
                     ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
                     fireAdvisory(context, topic, value, info.getConsumerId());
@@ -140,25 +124,25 @@ public class AdvisoryBroker extends BrokerFilter {
 
             // Replay the producers.
             if (AdvisorySupport.isProducerAdvisoryTopic(info.getDestination())) {
-                for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext();) {
+                for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext(); ) {
                     ProducerInfo value = iter.next();
                     ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination());
-                    fireProducerAdvisory(context, value.getDestination(),topic, value, info.getConsumerId());
+                    fireProducerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId());
                 }
             }
 
             // Replay the consumers.
             if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
-                for (Iterator<ConsumerInfo> iter = consumers.iterator(); iter.hasNext();) {
+                for (Iterator<ConsumerInfo> iter = consumers.iterator(); iter.hasNext(); ) {
                     ConsumerInfo value = iter.next();
                     ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
-                    fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId());
+                    fireConsumerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId());
                 }
             }
 
             // Replay network bridges
             if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) {
-                for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext();) {
+                for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext(); ) {
                     BrokerInfo key = iter.next();
                     ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
                     fireAdvisory(context, topic, key, null, networkBridges.get(key));
@@ -181,12 +165,12 @@ public class AdvisoryBroker extends BrokerFilter {
     }
 
     @Override
-    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
-        Destination answer = super.addDestination(context, destination,create);
+    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception {
+        Destination answer = super.addDestination(context, destination, create);
         if (!AdvisorySupport.isAdvisoryTopic(destination)) {
             DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
             DestinationInfo previous = destinations.putIfAbsent(destination, info);
-            if( previous==null ) {
+            if (previous == null) {
                 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
                 fireAdvisory(context, topic, info);
             }
@@ -201,7 +185,7 @@ public class AdvisoryBroker extends BrokerFilter {
 
         if (!AdvisorySupport.isAdvisoryTopic(destination)) {
             DestinationInfo previous = destinations.putIfAbsent(destination, info);
-            if( previous==null ) {
+            if (previous == null) {
                 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
                 fireAdvisory(context, topic, info);
             }
@@ -220,7 +204,7 @@ public class AdvisoryBroker extends BrokerFilter {
             ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
             fireAdvisory(context, topic, info);
             ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destination);
-            for(ActiveMQTopic advisoryDestination : advisoryDestinations) {
+            for (ActiveMQTopic advisoryDestination : advisoryDestinations) {
                 try {
                     next.removeDestination(context, advisoryDestination, -1);
                 } catch (Exception expectedIfDestinationDidNotExistYet) {
@@ -241,7 +225,7 @@ public class AdvisoryBroker extends BrokerFilter {
             ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination());
             fireAdvisory(context, topic, info);
             ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destInfo.getDestination());
-            for(ActiveMQTopic advisoryDestination : advisoryDestinations) {
+            for (ActiveMQTopic advisoryDestination : advisoryDestinations) {
                 try {
                     next.removeDestination(context, advisoryDestination, -1);
                 } catch (Exception expectedIfDestinationDidNotExistYet) {
@@ -269,7 +253,7 @@ public class AdvisoryBroker extends BrokerFilter {
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
             consumers.remove(info);
             if (!dest.isTemporary() || destinations.containsKey(dest)) {
-                fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand());
+                fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand());
             }
         }
     }
@@ -277,7 +261,7 @@ public class AdvisoryBroker extends BrokerFilter {
     @Override
     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
         SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
-        DurableTopicSubscription sub = ((TopicRegion)((RegionBroker)next).getTopicRegion()).getDurableSubscription(key);
+        DurableTopicSubscription sub = ((TopicRegion) ((RegionBroker) next).getTopicRegion()).getDurableSubscription(key);
 
         super.removeSubscription(context, info);
 
@@ -305,8 +289,8 @@ public class AdvisoryBroker extends BrokerFilter {
         if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) {
             ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest);
             producers.remove(info.getProducerId());
-            if (!dest.isTemporary() || destinations.contains(dest)) {
-                fireProducerAdvisory(context, dest,topic, info.createRemoveCommand());
+            if (!dest.isTemporary() || destinations.containsKey(dest)) {
+                fireProducerAdvisory(context, dest, topic, info.createRemoveCommand());
             }
         }
     }
@@ -315,7 +299,7 @@ public class AdvisoryBroker extends BrokerFilter {
     public void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
         super.messageExpired(context, messageReference, subscription);
         try {
-            if(!messageReference.isAdvisory()) {
+            if (!messageReference.isAdvisory()) {
                 ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
                 Message payload = messageReference.getMessage().copy();
                 payload.clearBody();
@@ -332,11 +316,11 @@ public class AdvisoryBroker extends BrokerFilter {
     public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
         super.messageConsumed(context, messageReference);
         try {
-            if(!messageReference.isAdvisory()) {
+            if (!messageReference.isAdvisory()) {
                 ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination());
                 Message payload = messageReference.getMessage().copy();
                 payload.clearBody();
-                fireAdvisory(context, topic,payload);
+                fireAdvisory(context, topic, payload);
             }
         } catch (Exception e) {
             handleFireFailure("consumed", e);
@@ -351,7 +335,7 @@ public class AdvisoryBroker extends BrokerFilter {
                 ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination());
                 Message payload = messageReference.getMessage().copy();
                 payload.clearBody();
-                fireAdvisory(context, topic,payload);
+                fireAdvisory(context, topic, payload);
             }
         } catch (Exception e) {
             handleFireFailure("delivered", e);
@@ -368,7 +352,7 @@ public class AdvisoryBroker extends BrokerFilter {
                 payload.clearBody();
                 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
                 if (sub instanceof TopicSubscription) {
-                    advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription)sub).discarded());
+                    advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription) sub).discarded());
                 }
                 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString());
                 fireAdvisory(context, topic, payload, null, advisoryMessage);
@@ -379,8 +363,8 @@ public class AdvisoryBroker extends BrokerFilter {
     }
 
     @Override
-    public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
-        super.slowConsumer(context, destination,subs);
+    public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) {
+        super.slowConsumer(context, destination, subs);
         try {
             if (!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
                 ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
@@ -394,7 +378,7 @@ public class AdvisoryBroker extends BrokerFilter {
     }
 
     @Override
-    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
+    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo, ActiveMQDestination destination) {
         super.fastProducer(context, producerInfo, destination);
         try {
             if (!AdvisorySupport.isAdvisoryTopic(destination)) {
@@ -434,7 +418,7 @@ public class AdvisoryBroker extends BrokerFilter {
             ConnectionContext context = new ConnectionContext();
             context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
             context.setBroker(getBrokerService().getBroker());
-            fireAdvisory(context, topic,null,null,advisoryMessage);
+            fireAdvisory(context, topic, null, null, advisoryMessage);
         } catch (Exception e) {
             handleFireFailure("now master broker", e);
         }
@@ -446,11 +430,11 @@ public class AdvisoryBroker extends BrokerFilter {
         boolean wasDLQd = super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
         if (wasDLQd) {
             try {
-                if(!messageReference.isAdvisory()) {
+                if (!messageReference.isAdvisory()) {
                     ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());
                     Message payload = messageReference.getMessage().copy();
                     payload.clearBody();
-                    fireAdvisory(context, topic,payload);
+                    fireAdvisory(context, topic, payload);
                 }
             } catch (Exception e) {
                 handleFireFailure("add to DLQ", e);
@@ -463,20 +447,20 @@ public class AdvisoryBroker extends BrokerFilter {
     @Override
     public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
         try {
-         if (brokerInfo != null) {
-             ActiveMQMessage advisoryMessage = new ActiveMQMessage();
-             advisoryMessage.setBooleanProperty("started", true);
-             advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex);
-             advisoryMessage.setStringProperty("remoteIp", remoteIp);
-             networkBridges.putIfAbsent(brokerInfo, advisoryMessage);
-
-             ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
-
-             ConnectionContext context = new ConnectionContext();
-             context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
-             context.setBroker(getBrokerService().getBroker());
-             fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
-         }
+            if (brokerInfo != null) {
+                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+                advisoryMessage.setBooleanProperty("started", true);
+                advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex);
+                advisoryMessage.setStringProperty("remoteIp", remoteIp);
+                networkBridges.putIfAbsent(brokerInfo, advisoryMessage);
+
+                ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
+
+                ConnectionContext context = new ConnectionContext();
+                context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
+                context.setBroker(getBrokerService().getBroker());
+                fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
+            }
         } catch (Exception e) {
             handleFireFailure("network bridge started", e);
         }
@@ -485,18 +469,18 @@ public class AdvisoryBroker extends BrokerFilter {
     @Override
     public void networkBridgeStopped(BrokerInfo brokerInfo) {
         try {
-         if (brokerInfo != null) {
-             ActiveMQMessage advisoryMessage = new ActiveMQMessage();
-             advisoryMessage.setBooleanProperty("started", false);
-             networkBridges.remove(brokerInfo);
-
-             ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
-
-             ConnectionContext context = new ConnectionContext();
-             context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
-             context.setBroker(getBrokerService().getBroker());
-             fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
-         }
+            if (brokerInfo != null) {
+                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+                advisoryMessage.setBooleanProperty("started", false);
+                networkBridges.remove(brokerInfo);
+
+                ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
+
+                ConnectionContext context = new ConnectionContext();
+                context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
+                context.setBroker(getBrokerService().getBroker());
+                fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
+            }
         } catch (Exception e) {
             handleFireFailure("network bridge stopped", e);
         }
@@ -516,16 +500,16 @@ public class AdvisoryBroker extends BrokerFilter {
         fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
     }
 
-    protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command) throws Exception {
-        fireConsumerAdvisory(context, consumerDestination,topic, command, null);
+    protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command) throws Exception {
+        fireConsumerAdvisory(context, consumerDestination, topic, command, null);
     }
 
-    protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
+    protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
         ActiveMQMessage advisoryMessage = new ActiveMQMessage();
         int count = 0;
-        Set<Destination>set = getDestinations(consumerDestination);
+        Set<Destination> set = getDestinations(consumerDestination);
         if (set != null) {
-            for (Destination dest:set) {
+            for (Destination dest : set) {
                 count += dest.getDestinationStatistics().getConsumers().getCount();
             }
         }
@@ -534,11 +518,11 @@ public class AdvisoryBroker extends BrokerFilter {
         fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
     }
 
-    protected void fireProducerAdvisory(ConnectionContext context,ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception {
-        fireProducerAdvisory(context,producerDestination, topic, command, null);
+    protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception {
+        fireProducerAdvisory(context, producerDestination, topic, command, null);
     }
 
-    protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
+    protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
         ActiveMQMessage advisoryMessage = new ActiveMQMessage();
         int count = 0;
         if (producerDestination != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/11781d3c/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java
index 72da777..2c5f9cd 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -80,6 +81,19 @@ public class ConsumerListenerTest extends EmbeddedBrokerTestSupport implements C
         assertConsumerEvent(0, false);
     }
 
+    public void testConsumerEventsOnTemporaryDestination() throws Exception {
+
+        Session s = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
+        Destination dest = useTopic ? s.createTemporaryTopic() : s.createTemporaryQueue();
+        consumerEventSource = new ConsumerEventSource(connection, dest);
+        consumerEventSource.setConsumerListener(this);
+        consumerEventSource.start();
+        MessageConsumer consumer = s.createConsumer(dest);
+        assertConsumerEvent(1,true);
+        consumer.close();
+        assertConsumerEvent(0,false);
+    }
+
     public void onConsumerEvent(ConsumerEvent event) {
         eventQueue.add(event);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/11781d3c/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java
index bc90cab..dfa1b5e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java
@@ -21,10 +21,10 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-
 import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,18 +47,18 @@ public class ProducerListenerTest extends EmbeddedBrokerTestSupport implements P
         producerEventSource.start();
 
         consumerSession1 = createProducer();
-        assertConsumerEvent(1, true);
+        assertProducerEvent(1, true);
 
         consumerSession2 = createProducer();
-        assertConsumerEvent(2, true);
+        assertProducerEvent(2, true);
 
         consumerSession1.close();
         consumerSession1 = null;
-        assertConsumerEvent(1, false);
+        assertProducerEvent(1, false);
 
         consumerSession2.close();
         consumerSession2 = null;
-        assertConsumerEvent(0, false);
+        assertProducerEvent(0, false);
     }
 
     public void testListenWhileAlreadyConsumersActive() throws Exception {
@@ -66,18 +66,33 @@ public class ProducerListenerTest extends EmbeddedBrokerTestSupport implements P
         consumerSession2 = createProducer();
 
         producerEventSource.start();
-        assertConsumerEvent(2, true);
-        assertConsumerEvent(2, true);
+        assertProducerEvent(2, true);
+        assertProducerEvent(2, true);
 
         consumerSession1.close();
         consumerSession1 = null;
-        assertConsumerEvent(1, false);
+        assertProducerEvent(1, false);
 
         consumerSession2.close();
         consumerSession2 = null;
-        assertConsumerEvent(0, false);
+        assertProducerEvent(0, false);
+    }
+
+    public void testConsumerEventsOnTemporaryDestination() throws Exception {
+
+        Session s = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
+        Destination dest = useTopic ? s.createTemporaryTopic() : s.createTemporaryQueue();
+        producerEventSource = new ProducerEventSource(connection, dest);
+        producerEventSource.setProducerListener(this);
+        producerEventSource.start();
+        MessageProducer producer = s.createProducer(dest);
+        assertProducerEvent(1, true);
+        producer.close();
+        assertProducerEvent(0, false);
     }
 
+
+
     @Override
     public void onProducerEvent(ProducerEvent event) {
         eventQueue.add(event);
@@ -110,7 +125,7 @@ public class ProducerListenerTest extends EmbeddedBrokerTestSupport implements P
         super.tearDown();
     }
 
-    protected void assertConsumerEvent(int count, boolean started) throws InterruptedException {
+    protected void assertProducerEvent(int count, boolean started) throws InterruptedException {
         ProducerEvent event = waitForProducerEvent();
         assertEquals("Producer count", count, event.getProducerCount());
         assertEquals("started", started, event.isStarted());