You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/02/01 17:11:18 UTC
svn commit: r1239188 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/command/
main/java/org/apache/activemq/network/ test/java/org/apache/activemq/
test/java/org/apache/activemq/usecases/
Author: gtully
Date: Wed Feb 1 16:11:17 2012
New Revision: 1239188
URL: http://svn.apache.org/viewvc?rev=1239188&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3694 - Blocked/Slow advisory consumers in duplex network connector, eventually breaks request/reply with temps. The duplex case was not acking advisory messages, so we were limited to 750! Also revisit association of producer created temp with connection as this can still get deleted before an advisory. Solution is to let gc pick up temps created in this way, https://issues.apache.org/jira/browse/AMQ-2571. Resolve contention on destination creation for producer/advisory race condition. Additional tests
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerTempQueueAdvisoryTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=1239188&r1=1239187&r2=1239188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Wed Feb 1 16:11:17 2012
@@ -125,15 +125,15 @@ public abstract class AbstractRegion imp
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
boolean createIfTemporary) throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
- }
destinationsLock.writeLock().lock();
try {
Destination dest = destinations.get(destination);
if (dest == null) {
if (destination.isTemporary() == false || createIfTemporary) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
+ }
dest = createDestination(context, destination);
// intercept if there is a valid interceptor defined
DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
@@ -222,7 +222,7 @@ public abstract class AbstractRegion imp
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Destination doesn't exist: " + dest);
+ LOG.debug("Cannot remove a destination that doesn't exist: " + destination);
}
}
} finally {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1239188&r1=1239187&r2=1239188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Wed Feb 1 16:11:17 2012
@@ -65,6 +65,7 @@ import org.apache.activemq.filter.Boolea
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Task;
@@ -574,6 +575,11 @@ public class Queue extends BaseDestinati
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
message.setRegionDestination(this);
+ ProducerState state = producerExchange.getProducerState();
+ if (state == null) {
+ LOG.warn("Send failed for: " + message + ", missing producer state for: " + producerExchange);
+ throw new JMSException("Cannot send message to " + getActiveMQDestination() + " with invalid (null) producer state");
+ }
final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
&& !context.isInRecoveryMode();
@@ -1664,8 +1670,8 @@ public class Queue extends BaseDestinati
}finally {
consumersLock.readLock().unlock();
}
- if (LOG.isTraceEnabled()) {
- LOG.trace("Message " + msg.getMessageId() + " sent to " + this.destination);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(broker.getBrokerName() + " Message " + msg.getMessageId() + " sent to " + this.destination);
}
wakeup();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1239188&r1=1239187&r2=1239188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Wed Feb 1 16:11:17 2012
@@ -288,7 +288,7 @@ public class RegionBroker extends EmptyB
}
@Override
- public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
+ public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemp) throws Exception {
Destination answer;
@@ -297,6 +297,12 @@ public class RegionBroker extends EmptyB
return answer;
}
+ synchronized (destinations) {
+ answer = destinations.get(destination);
+ if (answer != null) {
+ return answer;
+ }
+
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
answer = queueRegion.addDestination(context, destination,true);
@@ -305,10 +311,10 @@ public class RegionBroker extends EmptyB
answer = topicRegion.addDestination(context, destination,true);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
- answer = tempQueueRegion.addDestination(context, destination,create);
+ answer = tempQueueRegion.addDestination(context, destination, createIfTemp);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
- answer = tempTopicRegion.addDestination(context, destination,create);
+ answer = tempTopicRegion.addDestination(context, destination, createIfTemp);
break;
default:
throw createUnknownDestinationTypeException(destination);
@@ -316,6 +322,7 @@ public class RegionBroker extends EmptyB
destinations.put(destination, answer);
return answer;
+ }
}
@@ -374,22 +381,9 @@ public class RegionBroker extends EmptyB
if (destination != null) {
inactiveDestinationsPurgeLock.readLock().lock();
try {
- if (!destinations.containsKey(destination)) {
- // This seems to cause the destination to be added but without
- // advisories firing...
- context.getBroker().addDestination(context, destination, true);
- // associate it with the connection so that it can get deleted
- if (destination.isTemporary() && context.getConnectionState() != null) {
- DestinationInfo destinationInfo = new DestinationInfo(context.getConnectionId(),
- DestinationInfo.ADD_OPERATION_TYPE,
- destination);
- context.getConnectionState().addTempDestination(destinationInfo);
- if (LOG.isDebugEnabled()) {
- LOG.debug("assigning ownership of auto created temp : " + destination + " to connection:"
- + context.getConnectionId());
- }
- }
- }
+ // This seems to cause the destination to be added but without
+ // advisories firing...
+ context.getBroker().addDestination(context, destination, isAllowTempAutoCreationOnSend());
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.addProducer(context, info);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1239188&r1=1239187&r2=1239188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Wed Feb 1 16:11:17 2012
@@ -101,6 +101,7 @@ public class TopicSubscription extends A
} else {
//we are slow
if(!isSlowConsumer()) {
+ LOG.warn(toString() + ": has reached its prefetch limit without an ack, it appears to be slow");
setSlowConsumer(true);
for (Destination dest: destinations) {
dest.slowConsumer(getContext(), this);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java?rev=1239188&r1=1239187&r2=1239188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java Wed Feb 1 16:11:17 2012
@@ -86,6 +86,13 @@ public class MessageAck extends BaseComm
this.messageCount = messageCount;
}
+ public MessageAck(Message message, byte ackType, int messageCount) {
+ this.ackType = ackType;
+ this.destination = message.getDestination();
+ this.lastMessageId = message.getMessageId();
+ this.messageCount = messageCount;
+ }
+
public void copy(MessageAck copy) {
super.copy(copy);
copy.firstMessageId = firstMessageId;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1239188&r1=1239187&r2=1239188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Wed Feb 1 16:11:17 2012
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.network;
+import javax.jms.JMSException;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
@@ -368,11 +369,7 @@ public abstract class DemandForwardingBr
waitStarted();
MessageDispatch md = (MessageDispatch) command;
serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
- demandConsumerDispatched++;
- if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
- remoteBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched));
- demandConsumerDispatched = 0;
- }
+ ackAdvisory(md.getMessage());
} else if (command.isBrokerInfo()) {
lastConnectSucceeded.set(true);
remoteBrokerInfo = (BrokerInfo) command;
@@ -411,6 +408,7 @@ public abstract class DemandForwardingBr
if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
|| AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
serviceRemoteConsumerAdvisory(message.getDataStructure());
+ ackAdvisory(message);
} else {
if (!isPermissableDestination(message.getDestination(), true)) {
return;
@@ -431,6 +429,16 @@ public abstract class DemandForwardingBr
case ProducerInfo.DATA_STRUCTURE_TYPE:
localBroker.oneway(command);
break;
+ case MessageAck.DATA_STRUCTURE_TYPE:
+ MessageAck ack = (MessageAck) command;
+ DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
+ if (localSub != null) {
+ ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
+ localBroker.oneway(ack);
+ } else {
+ LOG.warn("Matching local subscription not found for ack: " + ack);
+ }
+ break;
case ConsumerInfo.DATA_STRUCTURE_TYPE:
localStartedLatch.await();
if (started.get()) {
@@ -480,6 +488,16 @@ public abstract class DemandForwardingBr
}
}
+ private void ackAdvisory(Message message) throws IOException {
+ demandConsumerDispatched++;
+ if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
+ MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
+ ack.setConsumerId(demandConsumerInfo.getConsumerId());
+ remoteBroker.oneway(ack);
+ demandConsumerDispatched = 0;
+ }
+ }
+
private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
final int networkTTL = configuration.getNetworkTTL();
if (data.getClass() == ConsumerInfo.class) {
@@ -520,7 +538,7 @@ public abstract class DemandForwardingBr
synchronized (brokerService.getVmConnectorURI()) {
if (addConsumerInfo(info)) {
if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + " bridging sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
+ LOG.debug(configuration.getBrokerName() + " bridged sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
}
} else {
if (LOG.isDebugEnabled()) {
@@ -555,7 +573,7 @@ public abstract class DemandForwardingBr
}
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
if (LOG.isTraceEnabled()) {
- LOG.trace(configuration.getBrokerName() +" bridging destination control command: " + destInfo);
+ LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker + " from " + remoteBrokerName + ", destination: " + destInfo);
}
localBroker.oneway(destInfo);
} else if (data.getClass() == RemoveInfo.class) {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1239188&r1=1239187&r2=1239188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Wed Feb 1 16:11:17 2012
@@ -114,6 +114,7 @@ public class JmsMultipleBrokersTestSuppo
uri = "static:(failover:(" + remoteURI + "))";
}
NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
+ connector.setName("to-" + remoteBroker.getBrokerName());
connector.setDynamicOnly(dynamicOnly);
connector.setNetworkTTL(networkTTL);
connector.setConduitSubscriptions(conduit);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java?rev=1239188&r1=1239187&r2=1239188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java Wed Feb 1 16:11:17 2012
@@ -70,11 +70,12 @@ public class RequestReplyNoAdvisoryNetwo
" http://activemq.apache.org/schema/core" +
" http://activemq.apache.org/schema/core/activemq-core.xsd\">" +
" <broker xmlns=\"http://activemq.apache.org/schema/core\" id=\"broker\"" +
+ " allowTempAutoCreationOnSend=\"true\" schedulePeriodForDestinationPurge=\"1000\"" +
" brokerName=\"%HOST%\" persistent=\"false\" advisorySupport=\"false\" useJmx=\"false\" >" +
" <destinationPolicy>" +
" <policyMap>" +
" <policyEntries>" +
- " <policyEntry optimizedDispatch=\"true\">"+
+ " <policyEntry optimizedDispatch=\"true\" gcInactiveDestinations=\"true\" gcWithNetworkConsumers=\"true\" inactiveTimoutBeforeGC=\"1000\">"+
" <destination>"+
" <tempQueue physicalName=\"" + replyQWildcard.getPhysicalName() + "\"/>" +
" </destination>" +
@@ -260,13 +261,14 @@ public class RequestReplyNoAdvisoryNetwo
broker.setPersistent(false);
broker.setUseJmx(false);
broker.setSchedulePeriodForDestinationPurge(1000);
+ broker.setAllowTempAutoCreationOnSend(true);
PolicyMap map = new PolicyMap();
PolicyEntry tempReplyQPolicy = new PolicyEntry();
tempReplyQPolicy.setOptimizedDispatch(true);
tempReplyQPolicy.setGcInactiveDestinations(true);
tempReplyQPolicy.setGcWithNetworkConsumers(true);
- tempReplyQPolicy.setInactiveTimoutBeforeGC(10*1000);
+ tempReplyQPolicy.setInactiveTimoutBeforeGC(1000);
map.put(replyQWildcard, tempReplyQPolicy);
broker.setDestinationPolicy(map);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerTempQueueAdvisoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerTempQueueAdvisoryTest.java?rev=1239188&r1=1239187&r2=1239188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerTempQueueAdvisoryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerTempQueueAdvisoryTest.java Wed Feb 1 16:11:17 2012
@@ -36,11 +36,21 @@ import java.net.URI;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
+import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.TemporaryQueue;
import javax.management.ObjectName;
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +71,9 @@ public class TwoBrokerTempQueueAdvisoryT
public void testTemporaryQueueAdvisory() throws Exception {
LOG.info("Running testTemporaryQueueAdvisory()");
+ bridgeBrokers("BrokerA", "BrokerB");
+ bridgeBrokers("BrokerB", "BrokerA");
+
startAllBrokers();
waitForBridgeFormation();
waitForMinTopicRegionConsumerCount("BrokerB", 1);
@@ -93,6 +106,68 @@ public class TwoBrokerTempQueueAdvisoryT
}));
}
+ public boolean useDuplex = true;
+ public void initCombosForTestSendToRemovedTemp() {
+ addCombinationValues("useDuplex", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
+ }
+
+ public void testSendToRemovedTemp() throws Exception {
+
+ ActiveMQQueue requestReplyDest = new ActiveMQQueue("RequestReply");
+
+ NetworkConnector nc = bridgeBrokers("BrokerA", "BrokerB");
+ if (useDuplex) {
+ nc.setDuplex(true);
+ } else {
+ bridgeBrokers("BrokerB", "BrokerA");
+ }
+
+ // destination advisory can loose the race with message dispatch, so we need to allow replies on network broker
+ // to work in the absence of an advisory, the destination will be cleaned up in the normal
+ // way
+ if (!useDuplex) {
+ brokers.get("BrokerB").broker.setAllowTempAutoCreationOnSend(true);
+ }
+
+ TransportConnector forClient = brokers.get("BrokerA").broker.addConnector("tcp://localhost:0");
+ startAllBrokers();
+ waitForBridgeFormation();
+ waitForMinTopicRegionConsumerCount("BrokerB", 1);
+ waitForMinTopicRegionConsumerCount("BrokerA", 1);
+
+ ConnectionFactory factory = new ActiveMQConnectionFactory(forClient.getConnectUri());
+ ActiveMQConnection conn = (ActiveMQConnection) factory.createConnection();
+ conn.setWatchTopicAdvisories(false);
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ConnectionFactory replyFactory = getConnectionFactory("BrokerB");
+ for (int i = 0; i < 500; i++) {
+ TemporaryQueue tempDest = session.createTemporaryQueue();
+ MessageProducer producer = session.createProducer(requestReplyDest);
+ javax.jms.Message message = session.createTextMessage("req-" + i);
+ message.setJMSReplyTo(tempDest);
+
+ ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(tempDest);
+ producer.send(message);
+
+ ActiveMQConnection replyConnection = (ActiveMQConnection) replyFactory.createConnection();
+ replyConnection.setWatchTopicAdvisories(false);
+ replyConnection.start();
+ Session replySession = replyConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQMessageConsumer replyConsumer = (ActiveMQMessageConsumer) replySession.createConsumer(requestReplyDest);
+ javax.jms.Message msg = replyConsumer.receive(10000);
+ assertNotNull("request message not null: " + i, msg);
+ MessageProducer replyProducer = replySession.createProducer(msg.getJMSReplyTo());
+ replyProducer.send(session.createTextMessage("reply-" + i));
+ replyConnection.close();
+
+ javax.jms.Message reply = consumer.receive(10000);
+ assertNotNull("reply message : " + i + ", to: " + tempDest + ", by consumer:" + consumer.getConsumerId(), reply);
+ consumer.close();
+ tempDest.delete();
+ }
+ }
protected DestinationViewMBean createView(String broker, String destination, byte type) throws Exception {
String domain = "org.apache.activemq";
@@ -113,8 +188,9 @@ public class TwoBrokerTempQueueAdvisoryT
String options = new String("?persistent=false");
createBroker(new URI("broker:(tcp://localhost:0)/BrokerA" + options));
createBroker(new URI("broker:(tcp://localhost:0)/BrokerB" + options));
+ }
- bridgeBrokers("BrokerA", "BrokerB");
- bridgeBrokers("BrokerB", "BrokerA");
+ public static Test suite() {
+ return suite(TwoBrokerTempQueueAdvisoryTest.class);
}
}