You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/02/01 17:11:18 UTC

svn commit: r1239188 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq/network/ test/java/org/apache/activemq/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Wed Feb  1 16:11:17 2012
New Revision: 1239188

URL: http://svn.apache.org/viewvc?rev=1239188&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3694 - Blocked/Slow advisory consumers in duplex network connector, eventually breaks request/reply with temps. The duplex case was not acking advisory messages, so we were limited to 750! Also revisit association of producer created temp with connection as this can still get deleted before an advisory. Solution is to let gc pick up temps created in this way, https://issues.apache.org/jira/browse/AMQ-2571. Resolve contention on destination creation for producer/advisory race condition. Additional tests

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerTempQueueAdvisoryTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=1239188&r1=1239187&r2=1239188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Wed Feb  1 16:11:17 2012
@@ -125,15 +125,15 @@ public abstract class AbstractRegion imp
 
     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
             boolean createIfTemporary) throws Exception {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
-        }
 
         destinationsLock.writeLock().lock();
         try {
             Destination dest = destinations.get(destination);
             if (dest == null) {
                 if (destination.isTemporary() == false || createIfTemporary) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
+                    }
                     dest = createDestination(context, destination);
                     // intercept if there is a valid interceptor defined
                     DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
@@ -222,7 +222,7 @@ public abstract class AbstractRegion imp
 
             } else {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Destination doesn't exist: " + dest);
+                    LOG.debug("Cannot remove a destination that doesn't exist: " + destination);
                 }
             }
         } finally {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1239188&r1=1239187&r2=1239188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Wed Feb  1 16:11:17 2012
@@ -65,6 +65,7 @@ import org.apache.activemq.filter.Boolea
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.thread.Task;
@@ -574,6 +575,11 @@ public class Queue extends BaseDestinati
         // There is delay between the client sending it and it arriving at the
         // destination.. it may have expired.
         message.setRegionDestination(this);
+        ProducerState state = producerExchange.getProducerState();
+        if (state == null) {
+            LOG.warn("Send failed for: " + message + ",  missing producer state for: " + producerExchange);
+            throw new JMSException("Cannot send message to " + getActiveMQDestination() + " with invalid (null) producer state");
+        }
         final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
         final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
                 && !context.isInRecoveryMode();
@@ -1664,8 +1670,8 @@ public class Queue extends BaseDestinati
         }finally {
             consumersLock.readLock().unlock();
         }
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Message " + msg.getMessageId() + " sent to " + this.destination);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(broker.getBrokerName() + " Message " + msg.getMessageId() + " sent to " + this.destination);
         }
         wakeup();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1239188&r1=1239187&r2=1239188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Wed Feb  1 16:11:17 2012
@@ -288,7 +288,7 @@ public class RegionBroker extends EmptyB
     }
 
     @Override
-    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
+    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemp) throws Exception {
 
         Destination answer;
 
@@ -297,6 +297,12 @@ public class RegionBroker extends EmptyB
             return answer;
         }
 
+     synchronized (destinations) {
+        answer = destinations.get(destination);
+        if (answer != null) {
+            return answer;
+        }
+
         switch (destination.getDestinationType()) {
         case ActiveMQDestination.QUEUE_TYPE:
             answer = queueRegion.addDestination(context, destination,true);
@@ -305,10 +311,10 @@ public class RegionBroker extends EmptyB
             answer = topicRegion.addDestination(context, destination,true);
             break;
         case ActiveMQDestination.TEMP_QUEUE_TYPE:
-            answer = tempQueueRegion.addDestination(context, destination,create);
+            answer = tempQueueRegion.addDestination(context, destination, createIfTemp);
             break;
         case ActiveMQDestination.TEMP_TOPIC_TYPE:
-            answer = tempTopicRegion.addDestination(context, destination,create);
+            answer = tempTopicRegion.addDestination(context, destination, createIfTemp);
             break;
         default:
             throw createUnknownDestinationTypeException(destination);
@@ -316,6 +322,7 @@ public class RegionBroker extends EmptyB
 
         destinations.put(destination, answer);
         return answer;
+     }
 
     }
 
@@ -374,22 +381,9 @@ public class RegionBroker extends EmptyB
         if (destination != null) {
             inactiveDestinationsPurgeLock.readLock().lock();
             try {
-                if (!destinations.containsKey(destination)) {
-                    // This seems to cause the destination to be added but without
-                    // advisories firing...
-                    context.getBroker().addDestination(context, destination, true);
-                    // associate it with the connection so that it can get deleted
-                    if (destination.isTemporary() && context.getConnectionState() != null) {
-                        DestinationInfo destinationInfo = new DestinationInfo(context.getConnectionId(),
-                                DestinationInfo.ADD_OPERATION_TYPE,
-                                destination);
-                        context.getConnectionState().addTempDestination(destinationInfo);
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("assigning ownership of auto created temp : " + destination + " to connection:"
-                                    + context.getConnectionId());
-                        }
-                    }
-                }
+                // This seems to cause the destination to be added but without
+                // advisories firing...
+                context.getBroker().addDestination(context, destination, isAllowTempAutoCreationOnSend());
                 switch (destination.getDestinationType()) {
                 case ActiveMQDestination.QUEUE_TYPE:
                     queueRegion.addProducer(context, info);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1239188&r1=1239187&r2=1239188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Wed Feb  1 16:11:17 2012
@@ -101,6 +101,7 @@ public class TopicSubscription extends A
         } else {
             //we are slow
             if(!isSlowConsumer()) {
+                LOG.warn(toString() + ": has reached its prefetch limit without an ack, it appears to be slow");
                 setSlowConsumer(true);
                 for (Destination dest: destinations) {
                     dest.slowConsumer(getContext(), this);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java?rev=1239188&r1=1239187&r2=1239188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java Wed Feb  1 16:11:17 2012
@@ -86,6 +86,13 @@ public class MessageAck extends BaseComm
         this.messageCount = messageCount;
     }
 
+    public MessageAck(Message message, byte ackType, int messageCount) {
+        this.ackType = ackType;
+        this.destination = message.getDestination();
+        this.lastMessageId = message.getMessageId();
+        this.messageCount = messageCount;
+    }
+
     public void copy(MessageAck copy) {
         super.copy(copy);
         copy.firstMessageId = firstMessageId;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1239188&r1=1239187&r2=1239188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Wed Feb  1 16:11:17 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.network;
 
+import javax.jms.JMSException;
 import org.apache.activemq.Service;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
@@ -368,11 +369,7 @@ public abstract class DemandForwardingBr
                     waitStarted();
                     MessageDispatch md = (MessageDispatch) command;
                     serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
-                    demandConsumerDispatched++;
-                    if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
-                        remoteBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched));
-                        demandConsumerDispatched = 0;
-                    }
+                    ackAdvisory(md.getMessage());
                 } else if (command.isBrokerInfo()) {
                     lastConnectSucceeded.set(true);
                     remoteBrokerInfo = (BrokerInfo) command;
@@ -411,6 +408,7 @@ public abstract class DemandForwardingBr
                             if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
                                 || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
                                 serviceRemoteConsumerAdvisory(message.getDataStructure());
+                                ackAdvisory(message);
                             } else {
                                 if (!isPermissableDestination(message.getDestination(), true)) {
                                     return;
@@ -431,6 +429,16 @@ public abstract class DemandForwardingBr
                             case ProducerInfo.DATA_STRUCTURE_TYPE:
                                 localBroker.oneway(command);
                                 break;
+                            case MessageAck.DATA_STRUCTURE_TYPE:
+                                MessageAck ack = (MessageAck) command;
+                                DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
+                                if (localSub != null) {
+                                    ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
+                                    localBroker.oneway(ack);
+                                } else {
+                                    LOG.warn("Matching local subscription not found for ack: " + ack);
+                                }
+                                break;
                             case ConsumerInfo.DATA_STRUCTURE_TYPE:
                                 localStartedLatch.await();
                                 if (started.get()) {
@@ -480,6 +488,16 @@ public abstract class DemandForwardingBr
         }
     }
 
+    private void ackAdvisory(Message message) throws IOException {
+        demandConsumerDispatched++;
+        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
+            MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
+            ack.setConsumerId(demandConsumerInfo.getConsumerId());
+            remoteBroker.oneway(ack);
+            demandConsumerDispatched = 0;
+        }
+    }
+
     private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
         final int networkTTL = configuration.getNetworkTTL();
         if (data.getClass() == ConsumerInfo.class) {
@@ -520,7 +538,7 @@ public abstract class DemandForwardingBr
             synchronized (brokerService.getVmConnectorURI()) {
                 if (addConsumerInfo(info)) {
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug(configuration.getBrokerName() + " bridging sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
+                        LOG.debug(configuration.getBrokerName() + " bridged sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
                     }
                 } else {
                     if (LOG.isDebugEnabled()) {
@@ -555,7 +573,7 @@ public abstract class DemandForwardingBr
             }
             destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
             if (LOG.isTraceEnabled()) {
-                LOG.trace(configuration.getBrokerName() +" bridging destination control command: " + destInfo);
+                LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker + " from " + remoteBrokerName + ", destination: " + destInfo);
             }
             localBroker.oneway(destInfo);
         } else if (data.getClass() == RemoveInfo.class) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1239188&r1=1239187&r2=1239188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Wed Feb  1 16:11:17 2012
@@ -114,6 +114,7 @@ public class JmsMultipleBrokersTestSuppo
                 uri = "static:(failover:(" + remoteURI + "))";
             }
             NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
+            connector.setName("to-" + remoteBroker.getBrokerName());
             connector.setDynamicOnly(dynamicOnly);
             connector.setNetworkTTL(networkTTL);
             connector.setConduitSubscriptions(conduit);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java?rev=1239188&r1=1239187&r2=1239188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java Wed Feb  1 16:11:17 2012
@@ -70,11 +70,12 @@ public class RequestReplyNoAdvisoryNetwo
                 " http://activemq.apache.org/schema/core" +
                 " http://activemq.apache.org/schema/core/activemq-core.xsd\">" +
                 "  <broker xmlns=\"http://activemq.apache.org/schema/core\" id=\"broker\"" +
+                "    allowTempAutoCreationOnSend=\"true\" schedulePeriodForDestinationPurge=\"1000\"" +
                 "    brokerName=\"%HOST%\" persistent=\"false\" advisorySupport=\"false\" useJmx=\"false\" >" +
                 "   <destinationPolicy>" +
                 "    <policyMap>" +
                 "     <policyEntries>" +
-                "      <policyEntry optimizedDispatch=\"true\">"+
+                "      <policyEntry optimizedDispatch=\"true\"  gcInactiveDestinations=\"true\" gcWithNetworkConsumers=\"true\" inactiveTimoutBeforeGC=\"1000\">"+
                 "       <destination>"+
                 "        <tempQueue physicalName=\"" + replyQWildcard.getPhysicalName() + "\"/>" +
                 "       </destination>" +
@@ -260,13 +261,14 @@ public class RequestReplyNoAdvisoryNetwo
         broker.setPersistent(false);
         broker.setUseJmx(false);
         broker.setSchedulePeriodForDestinationPurge(1000);
+        broker.setAllowTempAutoCreationOnSend(true);
 
         PolicyMap map = new PolicyMap();
         PolicyEntry tempReplyQPolicy = new PolicyEntry();
         tempReplyQPolicy.setOptimizedDispatch(true);
         tempReplyQPolicy.setGcInactiveDestinations(true);
         tempReplyQPolicy.setGcWithNetworkConsumers(true);
-        tempReplyQPolicy.setInactiveTimoutBeforeGC(10*1000);
+        tempReplyQPolicy.setInactiveTimoutBeforeGC(1000);
         map.put(replyQWildcard, tempReplyQPolicy);
         broker.setDestinationPolicy(map);
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerTempQueueAdvisoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerTempQueueAdvisoryTest.java?rev=1239188&r1=1239187&r2=1239188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerTempQueueAdvisoryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerTempQueueAdvisoryTest.java Wed Feb  1 16:11:17 2012
@@ -36,11 +36,21 @@ import java.net.URI;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.TemporaryQueue;
 import javax.management.ObjectName;
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.jmx.DestinationViewMBean;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +71,9 @@ public class TwoBrokerTempQueueAdvisoryT
     public void testTemporaryQueueAdvisory() throws Exception {
     	LOG.info("Running testTemporaryQueueAdvisory()");
 
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerA");
+
     	startAllBrokers();
         waitForBridgeFormation();
         waitForMinTopicRegionConsumerCount("BrokerB", 1);
@@ -93,6 +106,68 @@ public class TwoBrokerTempQueueAdvisoryT
         }));
     }
 
+    public boolean useDuplex = true;
+    public void initCombosForTestSendToRemovedTemp() {
+        addCombinationValues("useDuplex", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
+    }
+
+    public void testSendToRemovedTemp() throws Exception {
+
+        ActiveMQQueue requestReplyDest = new ActiveMQQueue("RequestReply");
+
+        NetworkConnector nc = bridgeBrokers("BrokerA", "BrokerB");
+        if (useDuplex) {
+            nc.setDuplex(true);
+        } else {
+            bridgeBrokers("BrokerB", "BrokerA");
+        }
+
+        // destination advisory can loose the race with message dispatch, so we need to allow replies on network broker
+        // to work in the absence of an advisory, the destination will be cleaned up in the normal
+        // way
+        if (!useDuplex) {
+            brokers.get("BrokerB").broker.setAllowTempAutoCreationOnSend(true);
+        }
+
+        TransportConnector forClient = brokers.get("BrokerA").broker.addConnector("tcp://localhost:0");
+        startAllBrokers();
+        waitForBridgeFormation();
+        waitForMinTopicRegionConsumerCount("BrokerB", 1);
+        waitForMinTopicRegionConsumerCount("BrokerA", 1);
+
+        ConnectionFactory factory = new ActiveMQConnectionFactory(forClient.getConnectUri());
+        ActiveMQConnection conn = (ActiveMQConnection) factory.createConnection();
+        conn.setWatchTopicAdvisories(false);
+        conn.start();
+        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ConnectionFactory replyFactory = getConnectionFactory("BrokerB");
+        for (int i = 0; i < 500; i++) {
+            TemporaryQueue tempDest = session.createTemporaryQueue();
+            MessageProducer producer = session.createProducer(requestReplyDest);
+            javax.jms.Message message = session.createTextMessage("req-" + i);
+            message.setJMSReplyTo(tempDest);
+
+            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(tempDest);
+            producer.send(message);
+
+            ActiveMQConnection replyConnection = (ActiveMQConnection) replyFactory.createConnection();
+            replyConnection.setWatchTopicAdvisories(false);
+            replyConnection.start();
+            Session replySession = replyConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            ActiveMQMessageConsumer replyConsumer = (ActiveMQMessageConsumer) replySession.createConsumer(requestReplyDest);
+            javax.jms.Message msg = replyConsumer.receive(10000);
+            assertNotNull("request message not null: " + i, msg);
+            MessageProducer replyProducer = replySession.createProducer(msg.getJMSReplyTo());
+            replyProducer.send(session.createTextMessage("reply-" + i));
+            replyConnection.close();
+
+            javax.jms.Message reply = consumer.receive(10000);
+            assertNotNull("reply message : " + i + ", to: " + tempDest + ", by consumer:" + consumer.getConsumerId(), reply);
+            consumer.close();
+            tempDest.delete();
+        }
+    }
 
     protected DestinationViewMBean createView(String broker, String destination, byte type) throws Exception {
         String domain = "org.apache.activemq";
@@ -113,8 +188,9 @@ public class TwoBrokerTempQueueAdvisoryT
         String options = new String("?persistent=false");
         createBroker(new URI("broker:(tcp://localhost:0)/BrokerA" + options));
         createBroker(new URI("broker:(tcp://localhost:0)/BrokerB" + options));
+    }
 
-        bridgeBrokers("BrokerA", "BrokerB");
-        bridgeBrokers("BrokerB", "BrokerA");
+    public static Test suite() {
+        return suite(TwoBrokerTempQueueAdvisoryTest.class);
     }
 }