You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/11/28 13:57:00 UTC
git commit: Added Fix and tests for
https://issues.apache.org/jira/browse/AMQ-4906
Updated Branches:
refs/heads/trunk e3fed4b57 -> 11781d3cf
Added Fix and tests for https://issues.apache.org/jira/browse/AMQ-4906
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/11781d3c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/11781d3c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/11781d3c
Branch: refs/heads/trunk
Commit: 11781d3cf2a351ea5f3ef80f29f351ac42d3538e
Parents: e3fed4b
Author: rajdavies <ra...@gmail.com>
Authored: Thu Nov 28 12:56:11 2013 +0000
Committer: rajdavies <ra...@gmail.com>
Committed: Thu Nov 28 12:56:11 2013 +0000
----------------------------------------------------------------------
.../activemq/advisory/AdvisoryBroker.java | 142 ++++++++-----------
.../activemq/advisory/ConsumerListenerTest.java | 14 ++
.../activemq/advisory/ProducerListenerTest.java | 35 +++--
3 files changed, 102 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/11781d3c/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index d68c5bd..5c90287 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -16,14 +16,12 @@
*/
package org.apache.activemq.advisory;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
@@ -36,21 +34,7 @@ import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.TopicSubscription;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.*;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.Usage;
@@ -72,7 +56,7 @@ public class AdvisoryBroker extends BrokerFilter {
protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
protected final Queue<ConsumerInfo> consumers = new ConcurrentLinkedQueue<ConsumerInfo>();
-
+
protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
@@ -111,7 +95,7 @@ public class AdvisoryBroker extends BrokerFilter {
// for this newly added consumer.
if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) {
// Replay the connections.
- for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext();) {
+ for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext(); ) {
ConnectionInfo value = iter.next();
ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
fireAdvisory(context, topic, value, info.getConsumerId());
@@ -140,25 +124,25 @@ public class AdvisoryBroker extends BrokerFilter {
// Replay the producers.
if (AdvisorySupport.isProducerAdvisoryTopic(info.getDestination())) {
- for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext();) {
+ for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext(); ) {
ProducerInfo value = iter.next();
ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination());
- fireProducerAdvisory(context, value.getDestination(),topic, value, info.getConsumerId());
+ fireProducerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId());
}
}
// Replay the consumers.
if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
- for (Iterator<ConsumerInfo> iter = consumers.iterator(); iter.hasNext();) {
+ for (Iterator<ConsumerInfo> iter = consumers.iterator(); iter.hasNext(); ) {
ConsumerInfo value = iter.next();
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
- fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId());
+ fireConsumerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId());
}
}
// Replay network bridges
if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) {
- for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext();) {
+ for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext(); ) {
BrokerInfo key = iter.next();
ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
fireAdvisory(context, topic, key, null, networkBridges.get(key));
@@ -181,12 +165,12 @@ public class AdvisoryBroker extends BrokerFilter {
}
@Override
- public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
- Destination answer = super.addDestination(context, destination,create);
+ public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception {
+ Destination answer = super.addDestination(context, destination, create);
if (!AdvisorySupport.isAdvisoryTopic(destination)) {
DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
DestinationInfo previous = destinations.putIfAbsent(destination, info);
- if( previous==null ) {
+ if (previous == null) {
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
fireAdvisory(context, topic, info);
}
@@ -201,7 +185,7 @@ public class AdvisoryBroker extends BrokerFilter {
if (!AdvisorySupport.isAdvisoryTopic(destination)) {
DestinationInfo previous = destinations.putIfAbsent(destination, info);
- if( previous==null ) {
+ if (previous == null) {
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
fireAdvisory(context, topic, info);
}
@@ -220,7 +204,7 @@ public class AdvisoryBroker extends BrokerFilter {
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
fireAdvisory(context, topic, info);
ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destination);
- for(ActiveMQTopic advisoryDestination : advisoryDestinations) {
+ for (ActiveMQTopic advisoryDestination : advisoryDestinations) {
try {
next.removeDestination(context, advisoryDestination, -1);
} catch (Exception expectedIfDestinationDidNotExistYet) {
@@ -241,7 +225,7 @@ public class AdvisoryBroker extends BrokerFilter {
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination());
fireAdvisory(context, topic, info);
ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destInfo.getDestination());
- for(ActiveMQTopic advisoryDestination : advisoryDestinations) {
+ for (ActiveMQTopic advisoryDestination : advisoryDestinations) {
try {
next.removeDestination(context, advisoryDestination, -1);
} catch (Exception expectedIfDestinationDidNotExistYet) {
@@ -269,7 +253,7 @@ public class AdvisoryBroker extends BrokerFilter {
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
consumers.remove(info);
if (!dest.isTemporary() || destinations.containsKey(dest)) {
- fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand());
+ fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand());
}
}
}
@@ -277,7 +261,7 @@ public class AdvisoryBroker extends BrokerFilter {
@Override
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
- DurableTopicSubscription sub = ((TopicRegion)((RegionBroker)next).getTopicRegion()).getDurableSubscription(key);
+ DurableTopicSubscription sub = ((TopicRegion) ((RegionBroker) next).getTopicRegion()).getDurableSubscription(key);
super.removeSubscription(context, info);
@@ -305,8 +289,8 @@ public class AdvisoryBroker extends BrokerFilter {
if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) {
ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest);
producers.remove(info.getProducerId());
- if (!dest.isTemporary() || destinations.contains(dest)) {
- fireProducerAdvisory(context, dest,topic, info.createRemoveCommand());
+ if (!dest.isTemporary() || destinations.containsKey(dest)) {
+ fireProducerAdvisory(context, dest, topic, info.createRemoveCommand());
}
}
}
@@ -315,7 +299,7 @@ public class AdvisoryBroker extends BrokerFilter {
public void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
super.messageExpired(context, messageReference, subscription);
try {
- if(!messageReference.isAdvisory()) {
+ if (!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
@@ -332,11 +316,11 @@ public class AdvisoryBroker extends BrokerFilter {
public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
super.messageConsumed(context, messageReference);
try {
- if(!messageReference.isAdvisory()) {
+ if (!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
- fireAdvisory(context, topic,payload);
+ fireAdvisory(context, topic, payload);
}
} catch (Exception e) {
handleFireFailure("consumed", e);
@@ -351,7 +335,7 @@ public class AdvisoryBroker extends BrokerFilter {
ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
- fireAdvisory(context, topic,payload);
+ fireAdvisory(context, topic, payload);
}
} catch (Exception e) {
handleFireFailure("delivered", e);
@@ -368,7 +352,7 @@ public class AdvisoryBroker extends BrokerFilter {
payload.clearBody();
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
if (sub instanceof TopicSubscription) {
- advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription)sub).discarded());
+ advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription) sub).discarded());
}
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString());
fireAdvisory(context, topic, payload, null, advisoryMessage);
@@ -379,8 +363,8 @@ public class AdvisoryBroker extends BrokerFilter {
}
@Override
- public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
- super.slowConsumer(context, destination,subs);
+ public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) {
+ super.slowConsumer(context, destination, subs);
try {
if (!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
@@ -394,7 +378,7 @@ public class AdvisoryBroker extends BrokerFilter {
}
@Override
- public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
+ public void fastProducer(ConnectionContext context, ProducerInfo producerInfo, ActiveMQDestination destination) {
super.fastProducer(context, producerInfo, destination);
try {
if (!AdvisorySupport.isAdvisoryTopic(destination)) {
@@ -434,7 +418,7 @@ public class AdvisoryBroker extends BrokerFilter {
ConnectionContext context = new ConnectionContext();
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
context.setBroker(getBrokerService().getBroker());
- fireAdvisory(context, topic,null,null,advisoryMessage);
+ fireAdvisory(context, topic, null, null, advisoryMessage);
} catch (Exception e) {
handleFireFailure("now master broker", e);
}
@@ -446,11 +430,11 @@ public class AdvisoryBroker extends BrokerFilter {
boolean wasDLQd = super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
if (wasDLQd) {
try {
- if(!messageReference.isAdvisory()) {
+ if (!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());
Message payload = messageReference.getMessage().copy();
payload.clearBody();
- fireAdvisory(context, topic,payload);
+ fireAdvisory(context, topic, payload);
}
} catch (Exception e) {
handleFireFailure("add to DLQ", e);
@@ -463,20 +447,20 @@ public class AdvisoryBroker extends BrokerFilter {
@Override
public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
try {
- if (brokerInfo != null) {
- ActiveMQMessage advisoryMessage = new ActiveMQMessage();
- advisoryMessage.setBooleanProperty("started", true);
- advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex);
- advisoryMessage.setStringProperty("remoteIp", remoteIp);
- networkBridges.putIfAbsent(brokerInfo, advisoryMessage);
-
- ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
-
- ConnectionContext context = new ConnectionContext();
- context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
- context.setBroker(getBrokerService().getBroker());
- fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
- }
+ if (brokerInfo != null) {
+ ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+ advisoryMessage.setBooleanProperty("started", true);
+ advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex);
+ advisoryMessage.setStringProperty("remoteIp", remoteIp);
+ networkBridges.putIfAbsent(brokerInfo, advisoryMessage);
+
+ ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
+
+ ConnectionContext context = new ConnectionContext();
+ context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
+ context.setBroker(getBrokerService().getBroker());
+ fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
+ }
} catch (Exception e) {
handleFireFailure("network bridge started", e);
}
@@ -485,18 +469,18 @@ public class AdvisoryBroker extends BrokerFilter {
@Override
public void networkBridgeStopped(BrokerInfo brokerInfo) {
try {
- if (brokerInfo != null) {
- ActiveMQMessage advisoryMessage = new ActiveMQMessage();
- advisoryMessage.setBooleanProperty("started", false);
- networkBridges.remove(brokerInfo);
-
- ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
-
- ConnectionContext context = new ConnectionContext();
- context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
- context.setBroker(getBrokerService().getBroker());
- fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
- }
+ if (brokerInfo != null) {
+ ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+ advisoryMessage.setBooleanProperty("started", false);
+ networkBridges.remove(brokerInfo);
+
+ ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
+
+ ConnectionContext context = new ConnectionContext();
+ context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
+ context.setBroker(getBrokerService().getBroker());
+ fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
+ }
} catch (Exception e) {
handleFireFailure("network bridge stopped", e);
}
@@ -516,16 +500,16 @@ public class AdvisoryBroker extends BrokerFilter {
fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
}
- protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command) throws Exception {
- fireConsumerAdvisory(context, consumerDestination,topic, command, null);
+ protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command) throws Exception {
+ fireConsumerAdvisory(context, consumerDestination, topic, command, null);
}
- protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
+ protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
int count = 0;
- Set<Destination>set = getDestinations(consumerDestination);
+ Set<Destination> set = getDestinations(consumerDestination);
if (set != null) {
- for (Destination dest:set) {
+ for (Destination dest : set) {
count += dest.getDestinationStatistics().getConsumers().getCount();
}
}
@@ -534,11 +518,11 @@ public class AdvisoryBroker extends BrokerFilter {
fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
}
- protected void fireProducerAdvisory(ConnectionContext context,ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception {
- fireProducerAdvisory(context,producerDestination, topic, command, null);
+ protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception {
+ fireProducerAdvisory(context, producerDestination, topic, command, null);
}
- protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
+ protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
int count = 0;
if (producerDestination != null) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/11781d3c/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java
index 72da777..2c5f9cd 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ConsumerListenerTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -80,6 +81,19 @@ public class ConsumerListenerTest extends EmbeddedBrokerTestSupport implements C
assertConsumerEvent(0, false);
}
+ public void testConsumerEventsOnTemporaryDestination() throws Exception {
+
+ Session s = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
+ Destination dest = useTopic ? s.createTemporaryTopic() : s.createTemporaryQueue();
+ consumerEventSource = new ConsumerEventSource(connection, dest);
+ consumerEventSource.setConsumerListener(this);
+ consumerEventSource.start();
+ MessageConsumer consumer = s.createConsumer(dest);
+ assertConsumerEvent(1,true);
+ consumer.close();
+ assertConsumerEvent(0,false);
+ }
+
public void onConsumerEvent(ConsumerEvent event) {
eventQueue.add(event);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/11781d3c/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java
index bc90cab..dfa1b5e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java
@@ -21,10 +21,10 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
-
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,18 +47,18 @@ public class ProducerListenerTest extends EmbeddedBrokerTestSupport implements P
producerEventSource.start();
consumerSession1 = createProducer();
- assertConsumerEvent(1, true);
+ assertProducerEvent(1, true);
consumerSession2 = createProducer();
- assertConsumerEvent(2, true);
+ assertProducerEvent(2, true);
consumerSession1.close();
consumerSession1 = null;
- assertConsumerEvent(1, false);
+ assertProducerEvent(1, false);
consumerSession2.close();
consumerSession2 = null;
- assertConsumerEvent(0, false);
+ assertProducerEvent(0, false);
}
public void testListenWhileAlreadyConsumersActive() throws Exception {
@@ -66,18 +66,33 @@ public class ProducerListenerTest extends EmbeddedBrokerTestSupport implements P
consumerSession2 = createProducer();
producerEventSource.start();
- assertConsumerEvent(2, true);
- assertConsumerEvent(2, true);
+ assertProducerEvent(2, true);
+ assertProducerEvent(2, true);
consumerSession1.close();
consumerSession1 = null;
- assertConsumerEvent(1, false);
+ assertProducerEvent(1, false);
consumerSession2.close();
consumerSession2 = null;
- assertConsumerEvent(0, false);
+ assertProducerEvent(0, false);
+ }
+
+ public void testConsumerEventsOnTemporaryDestination() throws Exception {
+
+ Session s = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
+ Destination dest = useTopic ? s.createTemporaryTopic() : s.createTemporaryQueue();
+ producerEventSource = new ProducerEventSource(connection, dest);
+ producerEventSource.setProducerListener(this);
+ producerEventSource.start();
+ MessageProducer producer = s.createProducer(dest);
+ assertProducerEvent(1, true);
+ producer.close();
+ assertProducerEvent(0, false);
}
+
+
@Override
public void onProducerEvent(ProducerEvent event) {
eventQueue.add(event);
@@ -110,7 +125,7 @@ public class ProducerListenerTest extends EmbeddedBrokerTestSupport implements P
super.tearDown();
}
- protected void assertConsumerEvent(int count, boolean started) throws InterruptedException {
+ protected void assertProducerEvent(int count, boolean started) throws InterruptedException {
ProducerEvent event = waitForProducerEvent();
assertEquals("Producer count", count, event.getProducerCount());
assertEquals("started", started, event.isStarted());