You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2009/11/11 01:21:27 UTC
svn commit: r834724 - in
/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit:
Client.java ErrorHandler.java MessageFactory.java Receiver.java Sender.java
TestLauncher.java
Author: rajith
Date: Wed Nov 11 00:21:27 2009
New Revision: 834724
URL: http://svn.apache.org/viewvc?rev=834724&view=rev
Log:
Moved MessageFactory to the tools module.
Added a Generic Sender and a Receiver.
They can be run standalone or used as a building block to create more complex tests.
TestLauncher is a utility to start a sender or receiver in multiple threads with some added plumbing.
Please refer to each class to see the full set of options available.
Added:
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java
Removed:
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java
Added: qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java?rev=834724&view=auto
==============================================================================
--- qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java (added)
+++ qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java Wed Nov 11 00:21:27 2009
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Session;
+
+public abstract class Client
+{
+ protected Connection con;
+ protected Session ssn;
+ protected boolean durable = false;
+ protected boolean transacted = false;
+ protected int txSize = 10;
+ protected int ack_mode = Session.AUTO_ACKNOWLEDGE;
+ protected String contentType = "application/octet-stream";
+ protected Destination dest = null;
+
+ protected long reportFrequency = 60000; // every min
+ protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+ protected NumberFormat nf = new DecimalFormat("##.00");
+
+ protected long startTime = System.currentTimeMillis();
+ protected ErrorHandler errorHandler = null;
+
+ public Client(Connection con) throws Exception
+ {
+ this.con = con;
+ durable = Boolean.getBoolean("durable");
+ transacted = Boolean.getBoolean("transacted");
+ txSize = Integer.getInteger("tx_size",10);
+ contentType = System.getProperty("content_type","application/octet-stream");
+ reportFrequency = Long.getLong("report_frequency", 60000);
+ }
+
+ public void close()
+ {
+ try
+ {
+ con.close();
+ }
+ catch (Exception e)
+ {
+ handleError("Error closing connection",e);
+ }
+ }
+
+ public void setErrorHandler(ErrorHandler h)
+ {
+ this.errorHandler = h;
+ }
+
+ public void handleError(String msg,Exception e)
+ {
+ if (errorHandler != null)
+ {
+ errorHandler.handleError(msg, e);
+ }
+ else
+ {
+ System.err.println(msg);
+ e.printStackTrace();
+ }
+ }
+}
Added: qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java?rev=834724&view=auto
==============================================================================
--- qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java (added)
+++ qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java Wed Nov 11 00:21:27 2009
@@ -0,0 +1,6 @@
+package org.apache.qpid.testkit;
+
+public interface ErrorHandler {
+
+ public void handleError(String msg,Exception e);
+}
Added: qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java?rev=834724&view=auto
==============================================================================
--- qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java (added)
+++ qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java Wed Nov 11 00:21:27 2009
@@ -0,0 +1,225 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQConnection;
+
+/**
+ * A generic receiver which consumers a stream of messages
+ * from a given address in a broker (host/port)
+ * until told to stop by killing it.
+ *
+ * It participates in a feedback loop to ensure the producer
+ * doesn't fill up the queue. If it receives an "End" msg
+ * it sends a reply to the replyTo address in that msg.
+ *
+ * It doesn't check for correctness or measure anything
+ * leaving those concerns to another entity.
+ * However it prints a timestamp every x secs(-Dreport_frequency)
+ * as checkpoint to figure out how far the test has progressed if
+ * a failure occurred.
+ *
+ * It also takes in an optional Error handler to
+ * pass out any error in addition to writing them to std err.
+ *
+ * This is intended more as building block to create
+ * more complex test cases. However there is a main method
+ * provided to use this standalone.
+ *
+ * The following options are available and configurable
+ * via jvm args.
+ *
+ * sync_rcv - Whether to consume sync (instead of using a listener).
+ * report_frequency - how often a timestamp is printed
+ * durable
+ * transacted
+ * tx_size - size of transaction batch in # msgs.
+ */
+public class Receiver extends Client implements MessageListener
+{
+ // Until addressing is properly supported.
+ protected enum Reliability {
+ AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE;
+
+ Reliability getReliability(String s)
+ {
+ if (s.equalsIgnoreCase("at_most_once"))
+ {
+ return AT_MOST_ONCE;
+ }
+ else if (s.equalsIgnoreCase("at_least_once"))
+ {
+ return AT_LEAST_ONCE;
+ }
+ else
+ {
+ return EXACTLY_ONCE;
+ }
+ }
+ };
+
+ long msg_count = 0;
+ int sequence = 0;
+ boolean sync_rcv = Boolean.getBoolean("sync_rcv");
+ boolean uniqueDests = Boolean.getBoolean("unique_dests");
+ Reliability reliability = Reliability.EXACTLY_ONCE;
+ MessageConsumer consumer;
+ List<Integer> duplicateMessages = new ArrayList<Integer>();
+
+ public Receiver(Connection con,Destination dest) throws Exception
+ {
+ super(con);
+ reliability = reliability.getReliability(System.getProperty("reliability","exactly_once"));
+ ssn = con.createSession(transacted,ack_mode);
+ consumer = ssn.createConsumer(dest);
+ if (!sync_rcv)
+ {
+ consumer.setMessageListener(this);
+ }
+
+ System.out.println("Operating in mode : " + reliability);
+ System.out.println("Receiving messages from : " + dest);
+ }
+
+ public void onMessage(Message msg)
+ {
+ handleMessage(msg);
+ }
+
+ public void run() throws Exception
+ {
+ while(true)
+ {
+ if(sync_rcv)
+ {
+ Message msg = consumer.receive();
+ handleMessage(msg);
+ }
+ Thread.sleep(reportFrequency);
+ System.out.println(df.format(System.currentTimeMillis())
+ + " - messages received : " + msg_count);
+ }
+ }
+
+ private void handleMessage(Message m)
+ {
+ try
+ {
+ if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End"))
+ {
+ MessageProducer temp = ssn.createProducer(m.getJMSReplyTo());
+ Message controlMsg = ssn.createTextMessage();
+ temp.send(controlMsg);
+ if (transacted)
+ {
+ ssn.commit();
+ }
+ temp.close();
+ }
+ else
+ {
+
+ int seq = m.getIntProperty("sequence");
+ if (uniqueDests)
+ {
+ if (seq == 0)
+ {
+ sequence = 0; // wrap around for each iteration
+ }
+
+ if (seq < sequence)
+ {
+ duplicateMessages.add(seq);
+ if (reliability == Reliability.EXACTLY_ONCE)
+ {
+ throw new Exception(": Received a duplicate message (expected="
+ + sequence + ",received=" + seq + ")" );
+ }
+ }
+ else if (seq == sequence)
+ {
+ sequence++;
+ msg_count ++;
+ }
+ else
+ {
+ // Multiple publishers are not allowed in this test case.
+ // So out of order messages are not allowed.
+ throw new Exception(": Received an out of order message (expected="
+ + sequence + ",received=" + seq + ")" );
+ }
+ }
+ // Please note that this test case doesn't expect duplicates
+ // When testing for transactions.
+ if (transacted && msg_count % txSize == 0)
+ {
+ ssn.commit();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ handleError("Exception receiving messages",e);
+ }
+ }
+
+ // Receiver host port address
+ public static void main(String[] args) throws Exception
+ {
+ String host = "127.0.0.1";
+ int port = 5672;
+
+ if (args.length > 0)
+ {
+ host = args[0];
+ }
+ if (args.length > 1)
+ {
+ port = Integer.parseInt(args[1]);
+ }
+ // #3rd argument should be an address
+ // Any other properties is best configured via jvm args
+
+ AMQConnection con = new AMQConnection(
+ "amqp://username:password@topicClientid/test?brokerlist='tcp://"
+ + host + ":" + port + "'");
+
+ // FIXME Need to add support for the new address format
+ // Then it's trivial to add destination for that.
+ Receiver rcv = new Receiver(con,null);
+ rcv.run();
+ }
+
+}
Added: qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java?rev=834724&view=auto
==============================================================================
--- qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java (added)
+++ qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java Wed Nov 11 00:21:27 2009
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.Random;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.tools.MessageFactory;
+
+/**
+ * A generic sender which sends a stream of messages
+ * to a given address in a broker (host/port)
+ * until told to stop by killing it.
+ *
+ * It has a feedback loop to ensure it doesn't fill
+ * up queues due to a slow consumer.
+ *
+ * It doesn't check for correctness or measure anything
+ * leaving those concerns to another entity.
+ * However it prints a timestamp every x secs(-Dreport_frequency)
+ * as checkpoint to figure out how far the test has progressed if
+ * a failure occurred.
+ *
+ * It also takes in an optional Error handler to
+ * pass out any error in addition to writing them to std err.
+ *
+ * This is intended more as building block to create
+ * more complex test cases. However there is a main method
+ * provided to use this standalone.
+ *
+ * The following options are available and configurable
+ * via jvm args.
+ *
+ * msg_size (256)
+ * msg_count (10) - # messages before waiting for feedback
+ * sleep_time (1000 ms) - sleep time btw each iteration
+ * report_frequency - how often a timestamp is printed
+ * durable
+ * transacted
+ * tx_size - size of transaction batch in # msgs.
+ */
+public class Sender extends Client
+{
+ protected int msg_size = 256;
+ protected int msg_count = 10;
+ protected int iterations = -1;
+ protected long sleep_time = 1000;
+
+ protected Destination dest = null;
+ protected Destination replyTo = null;
+ protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+ protected NumberFormat nf = new DecimalFormat("##.00");
+
+ protected MessageProducer producer;
+ Random gen = new Random(19770905);
+
+ public Sender(Connection con,Destination dest) throws Exception
+ {
+ super(con);
+ this.msg_size = Integer.getInteger("msg_size", 100);
+ this.msg_count = Integer.getInteger("msg_count", 10);
+ this.iterations = Integer.getInteger("iterations", -1);
+ this.sleep_time = Long.getLong("sleep_time", 1000);
+ this.ssn = con.createSession(transacted,Session.AUTO_ACKNOWLEDGE);
+ this.dest = dest;
+ this.producer = ssn.createProducer(dest);
+ this.replyTo = ssn.createTemporaryQueue();
+
+ System.out.println("Sending messages to : " + dest);
+ }
+
+ /*
+ * If msg_size not specified it generates a message
+ * between 500-1500 bytes.
+ */
+ protected Message getNextMessage() throws Exception
+ {
+ int s = msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size;
+ Message msg = (contentType.equals("text/plain")) ?
+ MessageFactory.createTextMessage(ssn, s):
+ MessageFactory.createBytesMessage(ssn, s);
+
+ msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT
+ : DeliveryMode.NON_PERSISTENT);
+ return msg;
+ }
+
+ public void run()
+ {
+ try
+ {
+ boolean infinite = (iterations == -1);
+ for (int x=0; infinite || x < iterations; x++)
+ {
+ long now = System.currentTimeMillis();
+ if (now - startTime >= reportFrequency)
+ {
+ System.out.println(df.format(now) + " - iterations : " + x);
+ startTime = now;
+ }
+
+ for (int i = 0; i < msg_count; i++)
+ {
+ Message msg = getNextMessage();
+ msg.setIntProperty("sequence",i);
+ producer.send(msg);
+ if (transacted && msg_count % txSize == 0)
+ {
+ ssn.commit();
+ }
+ }
+ TextMessage m = ssn.createTextMessage("End");
+ m.setJMSReplyTo(replyTo);
+ producer.send(m);
+
+ if (transacted)
+ {
+ ssn.commit();
+ }
+
+ MessageConsumer feedbackConsumer = ssn.createConsumer(replyTo);
+ feedbackConsumer.receive();
+ feedbackConsumer.close();
+ if (transacted)
+ {
+ ssn.commit();
+ }
+ Thread.sleep(sleep_time);
+ }
+ }
+ catch (Exception e)
+ {
+ handleError("Exception sending messages",e);
+ }
+ }
+
+ // Receiver host port address
+ public static void main(String[] args) throws Exception
+ {
+ String host = "127.0.0.1";
+ int port = 5672;
+
+ if (args.length > 0)
+ {
+ host = args[0];
+ }
+ if (args.length > 1)
+ {
+ port = Integer.parseInt(args[1]);
+ }
+ // #3rd argument should be an address
+ // Any other properties is best configured via jvm args
+
+ AMQConnection con = new AMQConnection(
+ "amqp://username:password@topicClientid/test?brokerlist='tcp://"
+ + host + ":" + port + "'");
+
+ // FIXME Need to add support for the new address format
+ // Then it's trivial to add destination for that.
+ Sender sender = new Sender(con,null);
+ sender.run();
+ }
+}
Added: qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java?rev=834724&view=auto
==============================================================================
--- qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java (added)
+++ qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java Wed Nov 11 00:21:27 2009
@@ -0,0 +1,361 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.thread.Threading;
+
+/**
+ * A basic test case class that could launch a Sender/Receiver
+ * or both, each on it's own separate thread.
+ *
+ * If con_count == ssn_count, then each entity created will have
+ * it's own Connection. Else if con_count < ssn_count, then
+ * a connection will be shared by ssn_count/con_count # of entities.
+ *
+ * The if both sender and receiver options are set, it will
+ * share a connection.
+ *
+ * The following options are available as jvm args
+ * host, port
+ * con_count,ssn_count
+ * con_idle_time - which determines heartbeat
+ * sender, receiver - booleans which indicate which entity to create.
+ * Setting them both is also a valid option.
+ */
+public class TestLauncher implements ErrorHandler
+{
+ protected String host = "127.0.0.1";
+ protected int port = 5672;
+ protected int session_count = 1;
+ protected int connection_count = 1;
+ protected long connection_idle_time = 5000;
+ protected boolean sender = false;
+ protected boolean receiver = false;
+ protected String url;
+
+ protected String queue_name = "message_queue";
+ protected String exchange_name = "amq.direct";
+ protected String routing_key = "routing_key";
+ protected boolean uniqueDests = false;
+ protected boolean durable = false;
+ protected String failover = "";
+ protected AMQConnection controlCon;
+ protected Destination controlDest = null;
+ protected Session controlSession = null;
+ protected MessageProducer statusSender;
+ protected List<AMQConnection> clients = new ArrayList<AMQConnection>();
+ protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+ protected NumberFormat nf = new DecimalFormat("##.00");
+ protected String testName;
+
+ public TestLauncher()
+ {
+ testName = System.getProperty("test_name","UNKNOWN");
+ host = System.getProperty("host", "127.0.0.1");
+ port = Integer.getInteger("port", 5672);
+ session_count = Integer.getInteger("ssn_count", 1);
+ connection_count = Integer.getInteger("con_count", 1);
+ connection_idle_time = Long.getLong("con_idle_time", 5000);
+ sender = Boolean.getBoolean("sender");
+ receiver = Boolean.getBoolean("receiver");
+
+ queue_name = System.getProperty("queue_name", "message_queue");
+ exchange_name = System.getProperty("exchange_name", "amq.direct");
+ routing_key = System.getProperty("routing_key", "routing_key");
+ failover = System.getProperty("failover", "");
+ uniqueDests = Boolean.getBoolean("unique_dests");
+ durable = Boolean.getBoolean("durable");
+
+ url = "amqp://username:password@topicClientid/test?brokerlist='tcp://"
+ + host + ":" + port + "?idle_timeout=" + connection_idle_time
+ + "'";
+
+ if (failover.equalsIgnoreCase("failover_exchange"))
+ {
+ url += "&failover='failover_exchange'";
+
+ System.out.println("Failover exchange " + url );
+ }
+ }
+
+ public void setUpControlChannel()
+ {
+ try
+ {
+ controlCon = new AMQConnection(url);
+ controlCon.start();
+
+ controlDest = new AMQQueue(new AMQShortString(""),
+ new AMQShortString("control"),
+ new AMQShortString("control"),
+ false, //exclusive
+ false, //auto-delete
+ false); // durable
+
+ // Create the session to setup the messages
+ controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ statusSender = controlSession.createProducer(controlDest);
+
+ }
+ catch (Exception e)
+ {
+ handleError("Error while setting up the test",e);
+ }
+ }
+
+ public void cleanup()
+ {
+ try
+ {
+ controlSession.close();
+ controlCon.close();
+ for (AMQConnection con : clients)
+ {
+ con.close();
+ }
+ }
+ catch (Exception e)
+ {
+ handleError("Error while tearing down the test",e);
+ }
+ }
+
+ public void start()
+ {
+ try
+ {
+
+ int ssn_per_con = session_count;
+ if (connection_count < session_count)
+ {
+ ssn_per_con = session_count/connection_count;
+ }
+
+ for (int i = 0; i< connection_count; i++)
+ {
+ AMQConnection con = new AMQConnection(url);
+ con.start();
+ clients.add(con);
+ for (int j = 0; j< ssn_per_con; j++)
+ {
+ String prefix = createPrefix(i,j);
+ Destination dest = createDest(prefix);
+ if (sender)
+ {
+ createSender(prefix,con,dest,this);
+ }
+
+ if (receiver)
+ {
+ createReceiver(prefix,con,dest,this);
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ handleError("Exception while setting up the test",e);
+ }
+
+ }
+
+ protected void createReceiver(String index,final AMQConnection con, final Destination dest, final ErrorHandler h)
+ {
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ Receiver rcv = new Receiver(con,dest);
+ rcv.setErrorHandler(h);
+ rcv.run();
+ }
+ catch (Exception e)
+ {
+ h.handleError("Error Starting Receiver", e);
+ }
+ }
+ };
+
+ Thread t = null;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ handleError("Error creating Receive thread",e);
+ }
+
+ t.setName("ReceiverThread-" + index);
+ t.start();
+ }
+
+ protected void createSender(String index,final AMQConnection con, final Destination dest, final ErrorHandler h)
+ {
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ Sender sender = new Sender(con, dest);
+ sender.setErrorHandler(h);
+ sender.run();
+ }
+ catch (Exception e)
+ {
+ h.handleError("Error Starting Sender", e);
+ }
+ }
+ };
+
+ Thread t = null;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ handleError("Error creating Sender thread",e);
+ }
+
+ t.setName("SenderThread-" + index);
+ t.start();
+ }
+
+ public void handleError(String msg,Exception e)
+ {
+ // In case sending the message fails
+ StringBuilder sb = new StringBuilder();
+ sb.append(msg);
+ sb.append(" @ ");
+ sb.append(df.format(new Date(System.currentTimeMillis())));
+ sb.append(" ");
+ sb.append(e.getMessage());
+ System.err.println(sb.toString());
+ e.printStackTrace();
+
+ try
+ {
+ TextMessage errorMsg = controlSession.createTextMessage();
+ errorMsg.setStringProperty("status", "error");
+ errorMsg.setStringProperty("desc", msg);
+ errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis())));
+ errorMsg.setStringProperty("exception-trace", serializeStackTrace(e));
+ synchronized (this)
+ {
+ statusSender.send(errorMsg);
+ }
+ } catch (JMSException e1) {
+ e1.printStackTrace();
+ }
+ }
+
+ private String serializeStackTrace(Exception e)
+ {
+ ByteArrayOutputStream bOut = new ByteArrayOutputStream();
+ PrintStream printStream = new PrintStream(bOut);
+ e.printStackTrace(printStream);
+ printStream.close();
+ return bOut.toString();
+ }
+
+ private String createPrefix(int i, int j)
+ {
+ return String.valueOf(i).concat(String.valueOf(j));
+ }
+
+ /**
+ * The following are supported.
+ *
+ * 1. A producer/consumer pair on a topic or a queue
+ * 2. A single producer with multiple consumers on topic/queue
+ *
+ * Multiple consumers on a topic will result in a private queue
+ * for each consumers.
+ *
+ * We want to avoid multiple producers on the same topic/queue
+ * as the queues will fill up in no time.
+ */
+ private Destination createDest(String prefix)
+ {
+ Destination dest = null;
+ if (exchange_name.equals("amq.topic"))
+ {
+ dest = new AMQTopic(
+ new AMQShortString(exchange_name),
+ new AMQShortString(uniqueDests ? prefix + routing_key :
+ routing_key),
+ false, //auto-delete
+ null, //queue name
+ durable);
+ }
+ else
+ {
+ dest = new AMQQueue(
+ new AMQShortString(exchange_name),
+ new AMQShortString(uniqueDests ? prefix + routing_key :
+ routing_key),
+ new AMQShortString(uniqueDests ? prefix + queue_name :
+ queue_name),
+ false, //exclusive
+ false, //auto-delete
+ durable);
+ }
+ return dest;
+ }
+
+ public static void main(String[] args)
+ {
+ final TestLauncher test = new TestLauncher();
+ test.setUpControlChannel();
+ test.start();
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() { test.cleanup(); }
+ });
+
+ }
+}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org