You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2012/04/16 08:23:15 UTC

svn commit: r1326502 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java test/java/org/apache/activemq/bugs/AMQ3274Test.java

Author: rajdavies
Date: Mon Apr 16 06:23:14 2012
New Revision: 1326502

URL: http://svn.apache.org/viewvc?rev=1326502&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-3274

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1326502&r1=1326501&r2=1326502&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Mon Apr 16 06:23:14 2012
@@ -30,7 +30,6 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.management.ObjectName;
-
 import org.apache.activemq.Service;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
@@ -38,35 +37,11 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.TransportConnection;
 import org.apache.activemq.broker.region.AbstractRegion;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.Region;
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTempDestination;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionError;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DataStructure;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.KeepAliveInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.NetworkBridgeFilter;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.command.*;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.thread.DefaultThreadPools;
@@ -1011,7 +986,7 @@ public abstract class DemandForwardingBr
 
         List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
         Collection<Subscription> currentSubs =
-            getRegionSubscriptions(consumerInfo.getDestination().isTopic());
+            getRegionSubscriptions(consumerInfo.getDestination());
         for (Subscription sub : currentSubs) {
             List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
             if (!networkConsumers.isEmpty()) {
@@ -1079,11 +1054,37 @@ public abstract class DemandForwardingBr
         return found;
     }
 
-    private final Collection<Subscription> getRegionSubscriptions(boolean isTopic) {
-        RegionBroker region = (RegionBroker) brokerService.getRegionBroker();
-        AbstractRegion abstractRegion = (AbstractRegion)
-            (isTopic ? region.getTopicRegion() : region.getQueueRegion());
-        return abstractRegion.getSubscriptions().values();
+    private final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
+        RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
+        Region region;
+        Collection<Subscription> subs;
+
+        region = null;
+        switch ( dest.getDestinationType() )
+        {
+            case ActiveMQDestination.QUEUE_TYPE:
+                region = region_broker.getQueueRegion();
+                break;
+
+            case ActiveMQDestination.TOPIC_TYPE:
+                region = region_broker.getTopicRegion();
+                break;
+
+            case ActiveMQDestination.TEMP_QUEUE_TYPE:
+                region = region_broker.getTempQueueRegion();
+                break;
+
+            case ActiveMQDestination.TEMP_TOPIC_TYPE:
+                region = region_broker.getTempTopicRegion();
+                break;
+        }
+
+        if ( region instanceof AbstractRegion )
+            subs = ((AbstractRegion) region).getSubscriptions().values();
+        else
+            subs = null;
+
+        return subs;
     }
 
     protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java?rev=1326502&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java Mon Apr 16 06:23:14 2012
@@ -0,0 +1,975 @@
+/*
+ */
+
+package org.apache.activemq.bugs;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.junit.*;
+import static org.junit.Assert.*;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ *
+ */
+public class AMQ3274Test
+{
+	protected static int		Next_broker_num = 0;
+	protected EmbeddedTcpBroker	broker1;
+	protected EmbeddedTcpBroker	broker2;
+
+	protected int			nextEchoId = 0;
+	protected boolean		testError = false;
+
+	protected int			echoResponseFill = 0;   // Number of "filler" response messages per request
+
+	protected static Log		LOG;
+
+	static
+	{
+		LOG = LogFactory.getLog(AMQ3274Test.class);
+	}
+
+	public AMQ3274Test ()
+	throws Exception
+	{
+		broker1 = new EmbeddedTcpBroker();
+		broker2 = new EmbeddedTcpBroker();
+
+		broker1.coreConnectTo(broker2, true);
+		broker2.coreConnectTo(broker1, true);
+	}
+
+	public void logMessage (String msg)
+	{
+		System.out.println(msg);
+		System.out.flush();
+	}
+
+
+	/**
+	 *
+	 */
+
+	public void testMessages (Session sess, MessageProducer req_prod, Destination resp_dest, int num_msg)
+	throws Exception
+	{
+		MessageConsumer resp_cons;
+		TextMessage		msg;
+		MessageClient	cons_client;
+		int				cur;
+		int				tot_expected;
+
+		resp_cons = sess.createConsumer(resp_dest);
+
+		cons_client = new MessageClient(resp_cons, num_msg);
+		cons_client.start();
+
+		cur = 0;
+		while ( ( cur < num_msg ) && ( ! testError ) )
+		{
+			msg = sess.createTextMessage("MSG AAAA " + cur);
+			msg.setIntProperty("SEQ", 100 + cur);
+			msg.setStringProperty("TEST", "TOPO");
+			msg.setJMSReplyTo(resp_dest);
+
+			if ( cur == ( num_msg - 1 ) )
+				msg.setBooleanProperty("end-of-response", true);
+
+			req_prod.send(msg);
+
+			cur++;
+		}
+
+			//
+			// Give the consumer some time to receive the response.
+			//
+		cons_client.waitShutdown(5000);
+
+			//
+			// Now shutdown the consumer if it's still running.
+			//
+		if ( cons_client.shutdown() )
+			LOG.debug("Consumer client shutdown complete");
+		else
+			LOG.debug("Consumer client shutdown incomplete!!!");
+
+
+			//
+			// Check that the correct number of messages was received.
+			//
+		tot_expected = num_msg * ( echoResponseFill + 1 );
+
+		if ( cons_client.getNumMsgReceived() == tot_expected )
+		{
+			LOG.info("Have " + tot_expected + " messages, as-expected");
+		}
+		else
+		{
+			testError = true;
+			LOG.info("Have " + cons_client.getNumMsgReceived() + " messages; expected " + tot_expected);
+		}
+
+		resp_cons.close();
+	}
+
+
+	/**
+	 * Test one destination between the given "producer broker" and "consumer broker" specified.
+	 */
+	public void testOneDest (Connection conn, Session sess, Destination cons_dest, String prod_broker_url,
+	                         String cons_broker_url, int num_msg)
+	throws Exception
+	{
+		int			echo_id;
+
+		EchoService		echo_svc;
+		String			echo_queue_name;
+		Destination		prod_dest;
+		MessageProducer		msg_prod;
+
+		synchronized ( this )
+		{
+			echo_id = this.nextEchoId;
+			this.nextEchoId++;
+		}
+
+		echo_queue_name = "echo.queue." + echo_id;
+
+			//
+			// Remove any previously-created echo queue with the same name.
+			//
+		LOG.trace("destroying the echo queue in case an old one exists");
+		removeQueue(conn, echo_queue_name);
+
+
+			//
+			// Now start the echo service with that queue.
+			//
+		echo_svc = new EchoService(echo_queue_name, prod_broker_url);
+		echo_svc.start();
+
+
+			//
+			// Create the Producer to the echo request Queue
+			//
+		LOG.trace("Creating echo queue and producer");
+		prod_dest = sess.createQueue(echo_queue_name);
+		msg_prod = sess.createProducer(prod_dest);
+
+
+			//
+			// Pass messages around.
+			//
+		testMessages(sess, msg_prod, cons_dest, num_msg);
+
+
+		//
+		//
+		//
+
+		echo_svc.shutdown();
+		msg_prod.close();
+	}
+
+
+	/**
+	 * TEST TEMPORARY TOPICS
+	 */
+	public void testTempTopic (String prod_broker_url, String cons_broker_url)
+	throws Exception
+	{
+		Connection		conn;
+		Session			sess;
+		Destination		cons_dest;
+		int			echo_id;
+		int			num_msg;
+
+		num_msg = 5;
+
+		LOG.info("TESTING TEMP TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
+		         " messages)");
+
+
+			//
+			// Connect to the bus.
+			//
+
+		conn = createConnection(cons_broker_url);
+		conn.start();
+		sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+			//
+			// Create the destination on which messages are being tested.
+			//
+
+		LOG.trace("Creating destination");
+		cons_dest = sess.createTemporaryTopic();
+
+		testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+
+			//
+			// Cleanup
+			//
+
+		sess.close();
+		conn.close();
+	}
+
+
+	/**
+	 * TEST TOPICS
+	 */
+	public void testTopic (String prod_broker_url, String cons_broker_url)
+	throws Exception
+	{
+		int				num_msg;
+
+		Connection		conn;
+		Session			sess;
+		String			topic_name;
+
+		Destination		cons_dest;
+
+		num_msg = 5;
+
+		LOG.info("TESTING TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
+		         " messages)");
+
+
+			//
+			// Connect to the bus.
+			//
+
+		conn = createConnection(cons_broker_url);
+		conn.start();
+		sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+			//
+			// Create the destination on which messages are being tested.
+			//
+
+		topic_name = "topotest2.perm.topic";
+		LOG.trace("Removing existing Topic");
+		removeTopic(conn, topic_name);
+		LOG.trace("Creating Topic, " + topic_name);
+		cons_dest = sess.createTopic(topic_name);
+
+		testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+
+			//
+			// Cleanup
+			//
+
+		removeTopic(conn, topic_name);
+		sess.close();
+		conn.close();
+	}
+
+
+	/**
+	 * TEST TEMPORARY QUEUES
+	 */
+	public void testTempQueue (String prod_broker_url, String cons_broker_url)
+	throws Exception
+	{
+		int		echo_id;
+		int		num_msg;
+
+		Connection	conn;
+		Session		sess;
+
+		Destination	cons_dest;
+
+		num_msg = 5;
+
+		LOG.info("TESTING TEMP QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
+		         " messages)");
+
+
+			//
+			// Connect to the bus.
+			//
+
+		conn = createConnection(cons_broker_url);
+		conn.start();
+		sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+			//
+			// Create the destination on which messages are being tested.
+			//
+
+		LOG.trace("Creating destination");
+		cons_dest = sess.createTemporaryQueue();
+
+		testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+
+			//
+			// Cleanup
+			//
+
+		sess.close();
+		conn.close();
+	}
+
+
+	/**
+	 * TEST QUEUES
+	 */
+	public void testQueue (String prod_broker_url, String cons_broker_url)
+	throws Exception
+	{
+		int				num_msg;
+
+		Connection		conn;
+		Session			sess;
+		String			queue_name;
+
+		Destination		cons_dest;
+
+		num_msg = 5;
+
+		LOG.info("TESTING QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
+		         " messages)");
+
+
+			//
+			// Connect to the bus.
+			//
+
+		conn = createConnection(cons_broker_url);
+		conn.start();
+		sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+			//
+			// Create the destination on which messages are being tested.
+			//
+
+		queue_name = "topotest2.perm.queue";
+		LOG.trace("Removing existing Queue");
+		removeQueue(conn, queue_name);
+		LOG.trace("Creating Queue, " + queue_name);
+		cons_dest = sess.createQueue(queue_name);
+
+		testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+
+			//
+			// Cleanup
+			//
+
+		removeQueue(conn, queue_name);
+		sess.close();
+		conn.close();
+	}
+
+	@Test
+	public void run ()
+	throws Exception
+	{
+		Thread	start1;
+		Thread	start2;
+
+		testError = false;
+
+			// Use threads to avoid startup deadlock since the first broker started waits until
+			//	it knows the name of the remote broker before finishing its startup, which means
+			//	the remote must already be running.
+
+		start1 = new Thread() {
+			public void run()
+			{
+				try {
+					broker1.start();
+				} catch (Exception ex) {
+					LOG.error(null, ex);
+				}
+			}
+		};
+
+		start2 = new Thread() {
+			public void run()
+			{
+				try {
+					broker2.start();
+				} catch (Exception ex) {
+					LOG.error(null, ex);
+				}
+			}
+		};
+
+		start1.start();
+		start2.start();
+
+		start1.join();
+		start2.join();
+
+		if ( ! testError )
+			this.testTempTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+
+		if ( ! testError )
+			this.testTempQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+
+		if ( ! testError )
+			this.testTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+
+		if ( ! testError )
+			this.testQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+
+		Thread.sleep(100);
+
+		shutdown();
+
+		assertTrue(! testError);
+	}
+
+	public void shutdown ()
+	throws Exception
+	{
+		broker1.stop();
+		broker2.stop();
+	}
+
+	/**
+	 * @param args the command line arguments
+	 */
+	public static void main(String[] args)
+	{
+		AMQ3274Test    main_obj;
+
+		try
+		{
+			main_obj = new AMQ3274Test();
+			main_obj.run();
+		}
+		catch (Exception ex)
+		{
+			ex.printStackTrace();
+			
+			LOG.error(null, ex);
+
+			System.exit(0);
+		}
+	}
+
+	protected Connection	createConnection (String url)
+	throws Exception
+	{
+		return	org.apache.activemq.ActiveMQConnection.makeConnection(url);
+	}
+
+	protected static void	removeQueue (Connection conn, String dest_name)
+	throws java.lang.Exception
+	{
+		org.apache.activemq.command.ActiveMQDestination		dest;
+
+		if ( conn instanceof org.apache.activemq.ActiveMQConnection )
+		{
+			dest = org.apache.activemq.command.ActiveMQDestination.
+			createDestination(dest_name, (byte) org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE);
+			((org.apache.activemq.ActiveMQConnection)conn).destroyDestination(dest);
+		}
+	}
+
+	protected static void	removeTopic (Connection conn, String dest_name)
+	throws java.lang.Exception
+	{
+		org.apache.activemq.command.ActiveMQDestination		dest;
+
+		if ( conn instanceof org.apache.activemq.ActiveMQConnection )
+		{
+			dest = org.apache.activemq.command.ActiveMQDestination.
+			createDestination(dest_name, (byte) org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE);
+			((org.apache.activemq.ActiveMQConnection)conn).destroyDestination(dest);
+		}
+	}
+
+	public static String fmtMsgInfo (Message msg)
+	throws Exception
+	{
+		StringBuilder		msg_desc;
+		String			prop;
+		Enumeration		prop_enum;
+
+		msg_desc = new StringBuilder();
+		msg_desc = new StringBuilder();
+
+		if ( msg instanceof TextMessage )
+		{
+			msg_desc.append(((TextMessage) msg).getText());
+		}
+		else
+		{
+			msg_desc.append("[");
+			msg_desc.append(msg.getClass().getName());
+			msg_desc.append("]");
+		}
+
+		prop_enum = msg.getPropertyNames();
+		while ( prop_enum.hasMoreElements() )
+		{
+			prop = (String) prop_enum.nextElement();
+			msg_desc.append("; ");
+			msg_desc.append(prop);
+			msg_desc.append("=");
+			msg_desc.append(msg.getStringProperty(prop));
+		}
+
+		return	msg_desc.toString();
+	}
+
+//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+/////////////////////////////////////////////////  INTERNAL CLASSES  /////////////////////////////////////////////////
+//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+	protected class EmbeddedTcpBroker
+	{
+		protected BrokerService		brokerSvc;
+		protected int			brokerNum;
+		protected String		brokerName;
+		protected String		brokerId;
+		protected int			port;
+		protected String		tcpUrl;
+
+		public EmbeddedTcpBroker ()
+		throws Exception
+		{
+			brokerSvc = new BrokerService();
+
+			synchronized ( this.getClass() )
+			{
+				brokerNum = Next_broker_num;
+				Next_broker_num++;
+			}
+
+			brokerName = "broker" + brokerNum;
+			brokerId = "b" + brokerNum;
+
+			brokerSvc.setBrokerName(brokerName);
+			brokerSvc.setBrokerId(brokerId);
+
+			brokerSvc.setPersistent(false);
+			brokerSvc.setUseJmx(false); // TBD
+
+			port = 60000 + ( brokerNum * 10 );
+
+				// Configure the transport connector (TCP)
+			tcpUrl = "tcp://127.0.0.1:" + Integer.toString(port);
+			brokerSvc.addConnector(tcpUrl);
+		}
+
+		public Connection	createConnection ()
+		throws URISyntaxException, JMSException
+		{
+			Connection	result;
+
+			result = org.apache.activemq.ActiveMQConnection.makeConnection(this.tcpUrl);
+
+			return	result;
+		}
+
+		public String	getConnectionUrl ()
+		{
+			return	this.tcpUrl;
+		}
+
+
+		/**
+		 * Create network connections to the given broker using the network-connector
+		 * configuration of CORE brokers (e.g. core1.bus.dev1.coresys.tmcs)
+		 *
+		 * @param other
+		 * @param duplex_f
+		 */
+		public void coreConnectTo (EmbeddedTcpBroker other, boolean duplex_f)
+		throws Exception
+		{
+			this.makeConnectionTo(other, duplex_f, true);
+			this.makeConnectionTo(other, duplex_f, false);
+		}
+
+		public void start ()
+		throws Exception
+		{
+			brokerSvc.start();
+			//brokerSvc.waitUntilStarted();
+		}
+
+		public void stop ()
+		throws Exception
+		{
+			brokerSvc.stop();
+		}
+
+
+		/**
+		 * Make one connection to the other embedded broker, of the specified type (queue or topic)
+		 * using the standard CORE broker networking.
+		 * 
+		 * @param other
+		 * @param duplex_f
+		 * @param queue_f
+		 * @throws Exception
+		 */
+		protected void	makeConnectionTo (EmbeddedTcpBroker other, boolean duplex_f, boolean queue_f)
+		throws Exception
+		{
+			NetworkConnector	nw_conn;
+			String				prefix;
+			ActiveMQDestination excl_dest;
+			ArrayList			excludes;
+
+			nw_conn = new DiscoveryNetworkConnector(new URI("static:(" + other.tcpUrl + ")"));
+			nw_conn.setDuplex(duplex_f);
+
+			if ( queue_f )
+				nw_conn.setConduitSubscriptions(false);
+			else
+				nw_conn.setConduitSubscriptions(true);
+
+			nw_conn.setNetworkTTL(5);
+			nw_conn.setSuppressDuplicateQueueSubscriptions(true);
+			nw_conn.setDecreaseNetworkConsumerPriority(true);
+			nw_conn.setBridgeTempDestinations(true);
+
+			if ( queue_f )
+			{
+				prefix = "queue";
+				excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE);
+			}
+			else
+			{
+				prefix = "topic";
+				excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE);
+			}
+
+			excludes = new ArrayList();
+			excludes.add(excl_dest);
+			nw_conn.setExcludedDestinations(excludes);
+
+			if ( duplex_f )
+				nw_conn.setName(this.brokerId + "<-" + prefix + "->" + other.brokerId);
+			else
+				nw_conn.setName(this.brokerId + "-" + prefix + "->" + other.brokerId);
+
+			brokerSvc.addNetworkConnector(nw_conn);
+		}
+	}
+
+	protected class MessageClient extends java.lang.Thread
+	{
+		protected MessageConsumer	msgCons;
+		protected boolean		shutdownInd;
+		protected int			expectedCount;
+		protected int			lastSeq = 0;
+		protected int			msgCount = 0;
+		protected boolean		haveFirstSeq;
+		protected CountDownLatch	shutdownLatch;
+
+		public MessageClient (MessageConsumer cons, int num_to_expect)
+		{
+			msgCons = cons;
+			expectedCount = ( num_to_expect * ( echoResponseFill + 1 ) );
+			shutdownLatch = new CountDownLatch(1);
+		}
+
+		public void run ()
+		{
+			CountDownLatch	latch;
+
+			try
+			{
+				synchronized ( this )
+				{
+					latch = shutdownLatch;
+				}
+
+				shutdownInd = false;
+				processMessages();
+
+				latch.countDown();
+			}
+			catch ( Exception exc )
+			{
+				LOG.error("message client error", exc);
+			}
+		}
+
+		public void waitShutdown (long timeout)
+		{
+			CountDownLatch	latch;
+
+			try
+			{
+				synchronized ( this )
+				{
+					latch = shutdownLatch;
+				}
+
+				if ( latch != null )
+					latch.await(timeout, TimeUnit.MILLISECONDS);
+				else
+					LOG.info("echo client shutdown: client does not appear to be active");
+			}
+			catch ( InterruptedException int_exc )
+			{
+				LOG.warn("wait for message client shutdown interrupted", int_exc);
+			}
+		}
+
+		public boolean shutdown ()
+		{
+			boolean down_ind;
+
+			if ( ! shutdownInd )
+			{
+				shutdownInd = true;
+			}
+
+			waitShutdown(200);
+
+			synchronized ( this )
+			{
+				if ( ( shutdownLatch == null ) || ( shutdownLatch.getCount() == 0 ) )
+					down_ind = true;
+				else
+					down_ind = false;
+			}
+
+			return	down_ind;
+		}
+
+		public int	getNumMsgReceived ()
+		{
+			return	msgCount;
+		}
+
+		protected void processMessages ()
+		throws Exception
+		{
+			Message in_msg;
+
+			haveFirstSeq = false;
+
+				//
+				// Stop at shutdown time or after any test error is detected.
+				//
+
+			while ( ( ! shutdownInd ) && ( ! testError ) )
+			{
+				in_msg = msgCons.receive(100);
+
+				if ( in_msg != null )
+				{
+					msgCount++;
+					checkMessage(in_msg);
+				}
+			}
+		}
+
+		protected void	checkMessage (Message in_msg)
+		throws Exception
+		{
+			int				seq;
+
+			LOG.debug("received message " + fmtMsgInfo(in_msg));
+
+				//
+				// Only check messages with a sequence number.
+				//
+
+			if ( in_msg.propertyExists("SEQ") )
+			{
+				seq = in_msg.getIntProperty("SEQ");
+
+				if ( ( haveFirstSeq ) && ( seq != ( lastSeq + 1 ) ) )
+				{
+					LOG.error("***ERROR*** incorrect sequence number; expected " +
+					          Integer.toString(lastSeq + 1) + " but have " +
+					          Integer.toString(seq));
+
+					testError = true;
+				}
+
+				lastSeq = seq;
+
+				if ( msgCount > expectedCount )
+				{
+					LOG.warn("*** have more messages than expected; have "	+ msgCount +
+					         "; expect " + expectedCount);
+
+					testError = true;
+				}
+			}
+
+			if ( in_msg.propertyExists("end-of-response") )
+			{
+				LOG.trace("received end-of-response message");
+				shutdownInd = true;
+			}
+		}
+	}
+
+	/**
+	 *
+	 */
+	protected class EchoService extends java.lang.Thread
+	{
+		protected String		destName;
+		protected Connection		jmsConn;
+		protected Session		sess;
+		protected MessageConsumer	msg_cons;
+		protected boolean		Shutdown_ind;
+
+		protected Destination		req_dest;
+		protected Destination		resp_dest;
+		protected MessageProducer	msg_prod;
+
+		protected CountDownLatch	waitShutdown;
+
+		public EchoService (String dest, Connection broker_conn)
+		throws Exception
+		{
+			destName = dest;
+			jmsConn = broker_conn;
+
+			Shutdown_ind = false;
+
+			sess = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			req_dest = sess.createQueue(destName);
+			msg_cons = sess.createConsumer(req_dest);
+
+			jmsConn.start();
+
+			waitShutdown = new CountDownLatch(1);
+		}
+
+		public EchoService (String dest, String broker_url)
+		throws Exception
+		{
+			this(dest, ActiveMQConnection.makeConnection(broker_url));
+		}
+
+		public void run ()
+		{
+			Message	req;
+
+			try
+			{
+				LOG.info("STARTING ECHO SERVICE");
+
+				while ( ! Shutdown_ind )
+				{
+					req = msg_cons.receive(100);
+					if ( req != null )
+					{
+						if ( LOG.isDebugEnabled() )
+							LOG.debug("ECHO request message " + req.toString());
+						
+						resp_dest = req.getJMSReplyTo();
+						if ( resp_dest != null )
+						{
+							msg_prod = sess.createProducer(resp_dest);
+							msg_prod.send(req);
+							msg_prod.close();
+							msg_prod = null;
+						}
+						else
+						{
+							LOG.warn("invalid request: no reply-to destination given");
+						}
+					}
+				}
+			}
+			catch (Exception ex)
+			{
+				LOG.error(null, ex);
+			}
+			finally
+			{
+				LOG.info("shutting down test echo service");
+
+				try
+				{
+					jmsConn.stop();
+				}
+				catch ( javax.jms.JMSException jms_exc )
+				{
+					LOG.warn("error on shutting down JMS connection", jms_exc);
+				}
+
+				synchronized ( this )
+				{
+					waitShutdown.countDown();
+				}
+			}
+		}
+
+
+		/**
+		 * Shut down the service, waiting up to 3 seconds for the service to terminate.
+		 */
+		public void shutdown ()
+		{
+			CountDownLatch	wait_l;
+
+			synchronized ( this )
+			{
+				wait_l = waitShutdown;
+			}
+
+			Shutdown_ind = true;
+			
+			try
+			{
+				if ( wait_l != null )
+				{
+					if ( wait_l.await(3000, TimeUnit.MILLISECONDS) )
+						LOG.info("echo service shutdown complete");
+					else
+						LOG.warn("timeout waiting for echo service shutdown");
+				}
+				else
+				{
+					LOG.info("echo service shutdown: service does not appear to be active");
+				}
+			}
+			catch ( InterruptedException int_exc )
+			{
+				LOG.warn("interrupted while waiting for echo service shutdown");
+			}
+		}
+	}
+}