You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2009/11/11 01:21:27 UTC

svn commit: r834724 - in /qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit: Client.java ErrorHandler.java MessageFactory.java Receiver.java Sender.java TestLauncher.java

Author: rajith
Date: Wed Nov 11 00:21:27 2009
New Revision: 834724

URL: http://svn.apache.org/viewvc?rev=834724&view=rev
Log:
Moved MessageFactory to the tools module.
Added a Generic Sender and a Receiver. 
They can be run standalone or used as a building block to create more complex tests.
TestLauncher is a utility to start a sender or receiver in multiple threads with some added plumbing.
Please refer to each class to see the full set of options available.

Added:
    qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java
    qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
    qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java
    qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java
    qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java
Removed:
    qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java

Added: qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java?rev=834724&view=auto
==============================================================================
--- qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java (added)
+++ qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java Wed Nov 11 00:21:27 2009
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Session;
+
+public abstract class Client
+{
+	protected Connection con;
+	protected Session ssn;
+    protected boolean durable = false;
+    protected boolean transacted = false;
+    protected int txSize = 10;
+    protected int ack_mode = Session.AUTO_ACKNOWLEDGE;
+    protected String contentType = "application/octet-stream";
+    protected Destination dest = null;
+        
+    protected long reportFrequency = 60000;  // every min
+    protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+    protected NumberFormat nf = new DecimalFormat("##.00");
+
+    protected long startTime = System.currentTimeMillis();
+    protected ErrorHandler errorHandler = null;
+    
+    public Client(Connection con) throws Exception
+    {
+       this.con = con;       
+       durable = Boolean.getBoolean("durable");
+       transacted = Boolean.getBoolean("transacted");
+       txSize = Integer.getInteger("tx_size",10);
+       contentType = System.getProperty("content_type","application/octet-stream");    
+       reportFrequency = Long.getLong("report_frequency", 60000);
+    }
+
+    public void close()
+    {
+    	try
+    	{
+    		con.close();
+    	}
+    	catch (Exception e)
+    	{
+    		handleError("Error closing connection",e);
+    	}
+    }
+    
+    public void setErrorHandler(ErrorHandler h)
+    {
+    	this.errorHandler = h;
+    }
+    
+    public void handleError(String msg,Exception e)
+    {
+    	if (errorHandler != null)
+    	{
+    		errorHandler.handleError(msg, e);
+    	}
+    	else
+    	{
+    		System.err.println(msg);
+    		e.printStackTrace();
+    	}
+    }
+}

Added: qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java?rev=834724&view=auto
==============================================================================
--- qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java (added)
+++ qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java Wed Nov 11 00:21:27 2009
@@ -0,0 +1,6 @@
+package org.apache.qpid.testkit;
+
+public interface ErrorHandler {
+
+	public void handleError(String msg,Exception e);
+}

Added: qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java?rev=834724&view=auto
==============================================================================
--- qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java (added)
+++ qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java Wed Nov 11 00:21:27 2009
@@ -0,0 +1,225 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQConnection;
+
+/**
+ * A generic receiver which consumers a stream of messages
+ * from a given address in a broker (host/port) 
+ * until told to stop by killing it.
+ * 
+ * It participates in a feedback loop to ensure the producer
+ * doesn't fill up the queue. If it receives an "End" msg
+ * it sends a reply to the replyTo address in that msg.
+ * 
+ * It doesn't check for correctness or measure anything
+ * leaving those concerns to another entity. 
+ * However it prints a timestamp every x secs(-Dreport_frequency)
+ * as checkpoint to figure out how far the test has progressed if
+ * a failure occurred.
+ * 
+ * It also takes in an optional Error handler to
+ * pass out any error in addition to writing them to std err. 
+ * 
+ * This is intended more as building block to create
+ * more complex test cases. However there is a main method
+ * provided to use this standalone.
+ *
+ * The following options are available and configurable 
+ * via jvm args.
+ * 
+ * sync_rcv - Whether to consume sync (instead of using a listener).
+ * report_frequency - how often a timestamp is printed
+ * durable
+ * transacted
+ * tx_size - size of transaction batch in # msgs. 
+ */
+public class Receiver extends Client implements MessageListener
+{
+	// Until addressing is properly supported.
+	protected enum Reliability {
+		AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE;
+		
+		Reliability getReliability(String s)
+		{
+			if (s.equalsIgnoreCase("at_most_once"))
+			{
+				return AT_MOST_ONCE;
+			}
+			else if (s.equalsIgnoreCase("at_least_once"))
+			{
+				return AT_LEAST_ONCE;
+			}
+			else
+			{
+				return EXACTLY_ONCE;
+			}
+		}
+	};
+	
+	long msg_count = 0;
+	int sequence = 0;
+	boolean sync_rcv = Boolean.getBoolean("sync_rcv");
+	boolean uniqueDests = Boolean.getBoolean("unique_dests");
+	Reliability reliability = Reliability.EXACTLY_ONCE;
+	MessageConsumer consumer;
+    List<Integer> duplicateMessages = new ArrayList<Integer>();
+    
+    public Receiver(Connection con,Destination dest) throws Exception
+    {
+    	super(con);
+    	reliability = reliability.getReliability(System.getProperty("reliability","exactly_once"));
+    	ssn = con.createSession(transacted,ack_mode);
+    	consumer = ssn.createConsumer(dest);
+    	if (!sync_rcv)
+    	{
+    		consumer.setMessageListener(this);
+    	}
+    	
+    	System.out.println("Operating in mode : " + reliability);
+    	System.out.println("Receiving messages from : " + dest);
+    }
+
+    public void onMessage(Message msg)
+    {    	
+    	handleMessage(msg);
+    }
+    
+    public void run() throws Exception
+    {
+    	while(true)
+    	{
+    		if(sync_rcv)
+    		{
+    			Message msg = consumer.receive();
+    			handleMessage(msg);
+    		}
+    		Thread.sleep(reportFrequency);
+    		System.out.println(df.format(System.currentTimeMillis())
+    				+ " - messages received : " + msg_count);
+    	}
+    }
+    
+    private void handleMessage(Message m)
+    {
+    	try
+        {   
+            if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End"))
+            {
+                MessageProducer temp = ssn.createProducer(m.getJMSReplyTo());
+                Message controlMsg = ssn.createTextMessage();
+                temp.send(controlMsg);
+                if (transacted)
+                {
+                    ssn.commit();
+                }
+                temp.close();
+            }
+            else
+            {   
+            	
+            	int seq = m.getIntProperty("sequence");   
+            	if (uniqueDests)
+            	{
+            		if (seq == 0)
+	                {
+            			sequence = 0; // wrap around for each iteration
+	                }
+            		
+	                if (seq < sequence)
+	                {                    
+	                    duplicateMessages.add(seq);
+	                    if (reliability == Reliability.EXACTLY_ONCE)
+	                    {
+	                    	throw new Exception(": Received a duplicate message (expected="
+	                    			+ sequence  + ",received=" + seq + ")" ); 
+	                    }                    
+	                }
+	                else if (seq == sequence)
+	                {
+	                	sequence++;
+	                	msg_count ++;
+	                }
+	                else
+	                {  
+	                	// Multiple publishers are not allowed in this test case.
+	                	// So out of order messages are not allowed.
+	                	throw new Exception(": Received an out of order message (expected="
+	                			+ sequence  + ",received=" + seq + ")" ); 
+	                }
+            	}
+                // Please note that this test case doesn't expect duplicates
+                // When testing for transactions.
+            	if (transacted && msg_count % txSize == 0)
+            	{
+            		ssn.commit();
+            	}
+            }
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+        	handleError("Exception receiving messages",e);
+        }   	
+    }
+
+    // Receiver host port address
+    public static void main(String[] args) throws Exception
+    {
+    	String host = "127.0.0.1";
+    	int port = 5672;
+    	
+    	if (args.length > 0)
+    	{
+    		host = args[0];
+    	}
+    	if (args.length > 1)
+    	{
+    		port = Integer.parseInt(args[1]);
+    	}    	
+    	// #3rd argument should be an address
+        // Any other properties is best configured via jvm args    	
+        
+    	AMQConnection con = new AMQConnection(
+				"amqp://username:password@topicClientid/test?brokerlist='tcp://"
+						+ host + ":" + port + "'");
+        
+        // FIXME Need to add support for the new address format
+        // Then it's trivial to add destination for that.
+        Receiver rcv = new Receiver(con,null);
+        rcv.run();
+    }
+
+}

Added: qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java?rev=834724&view=auto
==============================================================================
--- qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java (added)
+++ qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java Wed Nov 11 00:21:27 2009
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.Random;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.tools.MessageFactory;
+
+/**
+ * A generic sender which sends a stream of messages
+ * to a given address in a broker (host/port) 
+ * until told to stop by killing it.
+ * 
+ * It has a feedback loop to ensure it doesn't fill
+ * up queues due to a slow consumer.
+ * 
+ * It doesn't check for correctness or measure anything
+ * leaving those concerns to another entity. 
+ * However it prints a timestamp every x secs(-Dreport_frequency)
+ * as checkpoint to figure out how far the test has progressed if
+ * a failure occurred.
+ * 
+ * It also takes in an optional Error handler to
+ * pass out any error in addition to writing them to std err. 
+ * 
+ * This is intended more as building block to create
+ * more complex test cases. However there is a main method
+ * provided to use this standalone.
+ *
+ * The following options are available and configurable 
+ * via jvm args.
+ * 
+ * msg_size (256) 
+ * msg_count (10) - # messages before waiting for feedback
+ * sleep_time (1000 ms) - sleep time btw each iteration
+ * report_frequency - how often a timestamp is printed
+ * durable
+ * transacted
+ * tx_size - size of transaction batch in # msgs. 
+ */
+public class Sender extends Client
+{
+    protected int msg_size = 256;
+    protected int msg_count = 10;
+    protected int iterations = -1;
+    protected long sleep_time = 1000;
+
+    protected Destination dest = null;
+    protected Destination replyTo =  null;
+    protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+    protected NumberFormat nf = new DecimalFormat("##.00");
+    
+    protected MessageProducer producer;
+    Random gen = new Random(19770905);
+    
+    public Sender(Connection con,Destination dest) throws Exception
+    {
+       super(con);
+       this.msg_size = Integer.getInteger("msg_size", 100);
+       this.msg_count = Integer.getInteger("msg_count", 10);
+       this.iterations = Integer.getInteger("iterations", -1);
+       this.sleep_time = Long.getLong("sleep_time", 1000);
+       this.ssn = con.createSession(transacted,Session.AUTO_ACKNOWLEDGE);
+       this.dest = dest;
+       this.producer = ssn.createProducer(dest);
+       this.replyTo = ssn.createTemporaryQueue();
+       
+       System.out.println("Sending messages to : " + dest);
+    }
+
+    /*
+     * If msg_size not specified it generates a message
+     * between 500-1500 bytes.
+     */
+    protected Message getNextMessage() throws Exception
+    {
+        int s =  msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size;
+        Message msg = (contentType.equals("text/plain")) ?
+                MessageFactory.createTextMessage(ssn, s):
+                MessageFactory.createBytesMessage(ssn, s);
+                
+       msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT
+				: DeliveryMode.NON_PERSISTENT);
+       return msg;
+    }
+         
+    public void run()
+    {
+    	try 
+    	{
+    		boolean infinite = (iterations == -1);
+			for (int x=0; infinite || x < iterations; x++)
+			{
+				long now = System.currentTimeMillis();
+			    if (now - startTime >= reportFrequency)
+			    {
+			    	System.out.println(df.format(now) + " - iterations : " + x);
+			    	startTime = now;
+			    }
+			    
+			    for (int i = 0; i < msg_count; i++)
+			    {
+			        Message msg = getNextMessage();
+			        msg.setIntProperty("sequence",i);
+			        producer.send(msg);
+			        if (transacted && msg_count % txSize == 0)
+			        {
+			        	ssn.commit();
+			        }
+			    }
+			    TextMessage m = ssn.createTextMessage("End");
+			    m.setJMSReplyTo(replyTo);
+			    producer.send(m);
+
+			    if (transacted)
+			    {
+			        ssn.commit();
+			    }
+
+			    MessageConsumer feedbackConsumer = ssn.createConsumer(replyTo);
+			    feedbackConsumer.receive();
+			    feedbackConsumer.close();
+			    if (transacted)
+			    {
+			        ssn.commit();
+			    }
+			    Thread.sleep(sleep_time);
+			}
+		}
+    	catch (Exception e)
+        {
+            handleError("Exception sending messages",e);
+        }   	
+    }
+    
+    // Receiver host port address
+    public static void main(String[] args) throws Exception
+    {
+    	String host = "127.0.0.1";
+    	int port = 5672;
+    	
+    	if (args.length > 0)
+    	{
+    		host = args[0];
+    	}
+    	if (args.length > 1)
+    	{
+    		port = Integer.parseInt(args[1]);
+    	}
+    	// #3rd argument should be an address
+        // Any other properties is best configured via jvm args
+    	
+        AMQConnection con = new AMQConnection(
+				"amqp://username:password@topicClientid/test?brokerlist='tcp://"
+						+ host + ":" + port + "'");
+        
+        // FIXME Need to add support for the new address format
+        // Then it's trivial to add destination for that.
+        Sender sender = new Sender(con,null);
+        sender.run();
+    }
+}

Added: qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java?rev=834724&view=auto
==============================================================================
--- qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java (added)
+++ qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java Wed Nov 11 00:21:27 2009
@@ -0,0 +1,361 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.thread.Threading;
+
+/**
+ * A basic test case class that could launch a Sender/Receiver
+ * or both, each on it's own separate thread.
+ * 
+ * If con_count == ssn_count, then each entity created will have
+ * it's own Connection. Else if con_count < ssn_count, then
+ * a connection will be shared by ssn_count/con_count # of entities.
+ * 
+ * The if both sender and receiver options are set, it will
+ * share a connection.   
+ *
+ * The following options are available as jvm args
+ * host, port
+ * con_count,ssn_count
+ * con_idle_time -  which determines heartbeat
+ * sender, receiver - booleans which indicate which entity to create.
+ * Setting them both is also a valid option.
+ */
+public class TestLauncher implements ErrorHandler
+{
+    protected String host = "127.0.0.1";
+    protected int port = 5672;
+    protected int session_count = 1;
+    protected int connection_count = 1;
+    protected long connection_idle_time = 5000;
+    protected boolean sender = false;
+    protected boolean receiver = false;
+    protected String url;
+
+    protected String queue_name =  "message_queue";
+    protected String exchange_name =  "amq.direct";
+    protected String routing_key =  "routing_key";
+    protected boolean uniqueDests = false;
+    protected boolean durable = false;
+    protected String failover = "";
+    protected AMQConnection controlCon;
+    protected Destination controlDest = null;
+    protected Session controlSession = null;
+    protected MessageProducer statusSender;
+    protected List<AMQConnection> clients = new ArrayList<AMQConnection>();
+    protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+    protected NumberFormat nf = new DecimalFormat("##.00");
+    protected String testName;    
+        
+    public TestLauncher()
+    {
+       testName = System.getProperty("test_name","UNKNOWN");
+       host = System.getProperty("host", "127.0.0.1");
+       port = Integer.getInteger("port", 5672);
+       session_count = Integer.getInteger("ssn_count", 1);
+       connection_count = Integer.getInteger("con_count", 1);
+       connection_idle_time = Long.getLong("con_idle_time", 5000);
+       sender = Boolean.getBoolean("sender");
+       receiver = Boolean.getBoolean("receiver");
+       
+       queue_name = System.getProperty("queue_name", "message_queue");
+       exchange_name = System.getProperty("exchange_name", "amq.direct");
+       routing_key = System.getProperty("routing_key", "routing_key");
+       failover = System.getProperty("failover", "");
+       uniqueDests = Boolean.getBoolean("unique_dests");
+       durable = Boolean.getBoolean("durable");
+       
+       url = "amqp://username:password@topicClientid/test?brokerlist='tcp://"
+				+ host + ":" + port + "?idle_timeout=" + connection_idle_time
+				+ "'";
+       
+       if (failover.equalsIgnoreCase("failover_exchange"))
+       {
+    	   url += "&failover='failover_exchange'";
+    	   
+    	   System.out.println("Failover exchange " + url );
+       }
+    }
+
+    public void setUpControlChannel()
+    {
+        try
+        {
+            controlCon = new AMQConnection(url);
+            controlCon.start();
+            
+            controlDest = new AMQQueue(new AMQShortString(""),
+                                     new AMQShortString("control"),
+                                     new AMQShortString("control"),
+                                     false, //exclusive
+                                     false, //auto-delete
+                                     false); // durable
+
+            // Create the session to setup the messages
+            controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            statusSender = controlSession.createProducer(controlDest);
+
+        }
+        catch (Exception e)
+        {
+            handleError("Error while setting up the test",e);
+        }
+    }
+    
+    public void cleanup()
+    {
+    	try
+    	{
+    		controlSession.close();
+    		controlCon.close();
+    		for (AMQConnection con : clients)
+    		{
+    			con.close();
+    		}
+    	}
+	    catch (Exception e)
+	    {
+	        handleError("Error while tearing down the test",e);
+	    }
+    }
+        
+    public void start()
+    {
+        try
+        {
+           
+        	int ssn_per_con = session_count;
+        	if (connection_count < session_count)
+        	{
+        		ssn_per_con = session_count/connection_count;
+        	}
+        	
+        	for (int i = 0; i< connection_count; i++)
+        	{
+        		AMQConnection con = new AMQConnection(url);
+        		con.start();
+        		clients.add(con);        		
+        		for (int j = 0; j< ssn_per_con; j++)
+            	{
+        			String prefix = createPrefix(i,j);
+        			Destination dest = createDest(prefix);
+        			if (sender)
+        			{
+        				createSender(prefix,con,dest,this);
+        			}
+        			
+        			if (receiver)
+        			{
+        				createReceiver(prefix,con,dest,this);
+        			}
+            	}
+        	}
+        }
+        catch (Exception e)
+        {
+            handleError("Exception while setting up the test",e);
+        }
+
+    }
+    
+    protected void createReceiver(String index,final AMQConnection con, final Destination dest, final ErrorHandler h)
+    {
+    	Runnable r = new Runnable()
+        {
+            public void run()
+            {
+               try 
+               {
+            	   Receiver rcv = new Receiver(con,dest);
+				   rcv.setErrorHandler(h);
+				   rcv.run();
+				}
+	            catch (Exception e) 
+	            {
+					h.handleError("Error Starting Receiver", e);
+				}
+            }
+        };
+        
+        Thread t = null;
+        try
+        {
+            t = Threading.getThreadFactory().createThread(r);                      
+        }
+        catch(Exception e)
+        {
+            handleError("Error creating Receive thread",e);
+        }
+        
+        t.setName("ReceiverThread-" + index);
+        t.start();
+    }
+    
+    protected void createSender(String index,final AMQConnection con, final Destination dest, final ErrorHandler h)
+    {
+    	Runnable r = new Runnable()
+        {
+            public void run()
+            {
+               try 
+               {
+            	   Sender sender = new Sender(con, dest);
+            	   sender.setErrorHandler(h);
+            	   sender.run();
+				}
+	            catch (Exception e) 
+	            {
+					h.handleError("Error Starting Sender", e);
+				}
+            }
+        };
+        
+        Thread t = null;
+        try
+        {
+            t = Threading.getThreadFactory().createThread(r);                      
+        }
+        catch(Exception e)
+        {
+            handleError("Error creating Sender thread",e);
+        }
+        
+        t.setName("SenderThread-" + index);
+        t.start();
+    }
+
+    public void handleError(String msg,Exception e)
+    {
+    	// In case sending the message fails
+        StringBuilder sb = new StringBuilder();
+        sb.append(msg);
+        sb.append(" @ ");
+        sb.append(df.format(new Date(System.currentTimeMillis())));
+        sb.append(" ");
+        sb.append(e.getMessage());
+        System.err.println(sb.toString());
+        e.printStackTrace();
+        
+        try 
+        {
+			TextMessage errorMsg = controlSession.createTextMessage();
+			errorMsg.setStringProperty("status", "error");
+			errorMsg.setStringProperty("desc", msg);
+			errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis())));        
+			errorMsg.setStringProperty("exception-trace", serializeStackTrace(e));
+			synchronized (this)
+			{
+				statusSender.send(errorMsg);
+			}
+		} catch (JMSException e1) {
+			e1.printStackTrace();
+		}       
+    }
+    
+    private String serializeStackTrace(Exception e)
+    {
+    	ByteArrayOutputStream bOut = new ByteArrayOutputStream();
+    	PrintStream printStream = new PrintStream(bOut);
+    	e.printStackTrace(printStream);
+    	printStream.close();
+    	return bOut.toString();
+    }
+    
+    private String createPrefix(int i, int j)
+    {
+    	return String.valueOf(i).concat(String.valueOf(j));
+    }
+    
+    /**
+     * The following are supported.
+     * 
+     * 1. A producer/consumer pair on a topic or a queue
+     * 2. A single producer with multiple consumers on topic/queue
+     * 
+     * Multiple consumers on a topic will result in a private queue
+     * for each consumers.
+     * 
+     * We want to avoid multiple producers on the same topic/queue
+     * as the queues will fill up in no time.
+     */
+    private Destination createDest(String prefix)
+    {
+    	Destination dest = null;
+        if (exchange_name.equals("amq.topic"))
+        {
+            dest = new AMQTopic(
+            		 new AMQShortString(exchange_name),
+                     new AMQShortString(uniqueDests ? prefix + routing_key :
+							                                   routing_key),
+                     false,   //auto-delete
+                     null,   //queue name
+                     durable);
+        }
+        else
+        {
+            dest = new AMQQueue(
+            		new AMQShortString(exchange_name),
+            		new AMQShortString(uniqueDests ? prefix + routing_key :
+                                                              routing_key),
+                    new AMQShortString(uniqueDests ? prefix + queue_name :
+                    	                                      queue_name),
+                    false, //exclusive
+                    false, //auto-delete
+                    durable);
+        }
+        return dest;
+    }
+    
+    public static void main(String[] args)
+    {
+    	final TestLauncher test = new TestLauncher();
+    	test.setUpControlChannel();
+    	test.start();
+    	Runtime.getRuntime().addShutdownHook(new Thread() {
+    	    public void run() { test.cleanup(); }
+    	});
+
+    }
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org