You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/18 14:11:40 UTC
svn commit: r497425 - in /incubator/qpid/trunk/qpid/java: ./ etc/ perftests/
perftests/src/main/java/org/apache/qpid/ping/
perftests/src/main/java/org/apache/qpid/requestreply/
perftests/src/test/java/org/apache/qpid/ping/
perftests/src/test/java/org/a...
Author: rgreig
Date: Thu Jan 18 05:11:39 2007
New Revision: 497425
URL: http://svn.apache.org/viewvc?view=rev&rev=497425
Log:
(Patch submitted by Rupert Smith)
Restructured the ping tests, they now share common base classes to avoid cut and paste coding.
Added:
incubator/qpid/trunk/qpid/java/etc/
incubator/qpid/trunk/qpid/java/etc/log4j.xml
incubator/qpid/trunk/qpid/java/perftests/jar-with-dependencies.xml
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
Removed:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongClient.java
Modified:
incubator/qpid/trunk/qpid/java/perftests/pom.xml
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
incubator/qpid/trunk/qpid/java/pom.xml
Added: incubator/qpid/trunk/qpid/java/etc/log4j.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/etc/log4j.xml?view=auto&rev=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/etc/log4j.xml (added)
+++ incubator/qpid/trunk/qpid/java/etc/log4j.xml Thu Jan 18 05:11:39 2007
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<!-- ===================================================================== -->
+<!-- -->
+<!-- Log4j Configuration -->
+<!-- -->
+<!-- ===================================================================== -->
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+ <!-- ============================== -->
+ <!-- Append messages to the console -->
+ <!-- ============================== -->
+
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <param name="Threshold" value="ALL"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <!-- The default pattern: Date Priority [Category] Message\n -->
+ <param name="ConversionPattern" value="%m%n"/>
+ </layout>
+
+ </appender>
+
+ <!-- ================ -->
+ <!-- Limit categories -->
+ <!-- ================ -->
+
+ <category name="org.apache.qpid">
+ <priority value="${amqj.logging.level}" logger="CONSOLE"/>
+ </category>
+
+ <!-- ======================= -->
+ <!-- Setup the Root category -->
+ <!-- ======================= -->
+
+ <root level="${root.logging.level}">
+ <appender-ref ref="CONSOLE"/>
+ </root>
+
+</log4j:configuration>
Added: incubator/qpid/trunk/qpid/java/perftests/jar-with-dependencies.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/jar-with-dependencies.xml?view=auto&rev=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/jar-with-dependencies.xml (added)
+++ incubator/qpid/trunk/qpid/java/perftests/jar-with-dependencies.xml Thu Jan 18 05:11:39 2007
@@ -0,0 +1,29 @@
+<!-- This is an assembly descriptor that produces a jar file that contains all the
+ dependencies, fully expanded into a single jar, required to run the tests of
+ a maven project.
+ -->
+<assembly>
+ <id>all-test-deps</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory></outputDirectory>
+ <outputFileNameMapping></outputFileNameMapping>
+ <unpack>true</unpack>
+ <scope>test</scope>
+ </dependencySet>
+ </dependencySets>
+ <fileSets>
+ <fileSet>
+ <directory>target/classes</directory>
+ <outputDirectory></outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>target/test-classes</directory>
+ <outputDirectory></outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
Modified: incubator/qpid/trunk/qpid/java/perftests/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/pom.xml?view=diff&rev=497425&r1=497424&r2=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/pom.xml (original)
+++ incubator/qpid/trunk/qpid/java/perftests/pom.xml Thu Jan 18 05:11:39 2007
@@ -97,7 +97,7 @@
./bin/script_name or ./bin/script_name.bat
- These scripts can find everything in the 'all test dependencies' jar.
+ These scripts can find everything in the 'all test dependencies' jar created by the assembly:assembly goal.
-->
<!--
<plugin>
@@ -146,7 +146,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
- <version>${assembly.version}</version>
+ <!--<version>2.2-SNAPSHOT</version>-->
<configuration>
<descriptors>
<descriptor>jar-with-dependencies.xml</descriptor>
Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java?view=diff&rev=497425&r1=497424&r2=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java Thu Jan 18 05:11:39 2007
@@ -1,9 +1,13 @@
package org.apache.qpid.ping;
+import java.text.SimpleDateFormat;
+
+import javax.jms.Connection;
import javax.jms.JMSException;
import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.jms.Session;
/**
@@ -19,19 +23,20 @@
*/
public abstract class AbstractPingClient
{
+ /** Used to format time stamping output. */
+ protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
+
private static final Logger _logger = Logger.getLogger(TestPingClient.class);
+ private AMQConnection _connection;
- /** Used to keep a handle on the JMS session to send replies using. */
- protected Session _session;
+ public AMQConnection getConnection()
+ {
+ return _connection;
+ }
- /**
- * Creates an abstract ping client to manage the specified transcation.
- *
- * @param session The session.
- */
- public AbstractPingClient(Session session)
+ public void setConnection(AMQConnection _connection)
{
- _session = session;
+ this._connection = _connection;
}
/**
@@ -39,13 +44,13 @@
*
* @throws javax.jms.JMSException If the commit fails and then the rollback fails.
*/
- protected void commitTx() throws JMSException
+ protected void commitTx(Session session) throws JMSException
{
- if (_session.getTransacted())
+ if (session.getTransacted())
{
try
{
- _session.commit();
+ session.commit();
_logger.trace("Session Commited.");
}
catch (JMSException e)
@@ -54,7 +59,7 @@
try
{
- _session.rollback();
+ session.rollback();
_logger.debug("Message rolled back.");
}
catch (JMSException jmse)
Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java?view=diff&rev=497425&r1=497424&r2=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java Thu Jan 18 05:11:39 2007
@@ -1,5 +1,7 @@
package org.apache.qpid.ping;
+import java.text.SimpleDateFormat;
+
import javax.jms.*;
import org.apache.log4j.Logger;
@@ -15,9 +17,10 @@
*
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Manage session.
+ * <tr><td> Manage the connection.
* <tr><td> Provide clean shutdown on exception or shutdown hook.
* <tr><td> Provide useable shutdown hook implementation.
+ * <tr><td> Run a ping loop.
* </table>
*
* @author Rupert Smith
@@ -26,51 +29,66 @@
{
private static final Logger _logger = Logger.getLogger(AbstractPingProducer.class);
- /** Holds the current Qpid session to send and receive pings on. */
- protected Session _session;
+ /** Used to format time stamping output. */
+ protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
/** Used to tell the ping loop when to terminate, it only runs while this is true. */
protected boolean _publish = true;
+ /** Holds the connection handle to the broker. */
+ private Connection _connection;
+
+ /** Holds the producer session, need to create test messages. */
+ private Session _producerSession;
+
/**
- * Creates an AbstractPingProducer on a session.
+ * Convenience method for a short pause.
+ *
+ * @param sleepTime The time in milliseconds to pause for.
*/
- public AbstractPingProducer(Session session)
+ public static void pause(long sleepTime)
{
- _session = session;
+ if (sleepTime > 0)
+ {
+ try
+ {
+ Thread.sleep(sleepTime);
+ }
+ catch (InterruptedException ie)
+ { }
+ }
}
+ public abstract void pingLoop();
+
/**
* Generates a test message of the specified size.
*
- * @param session The Qpid session under which to generate the message.
* @param replyQueue The reply-to destination for the message.
* @param messageSize The desired size of the message in bytes.
- * @param currentTime The timestamp to add to the message as a "timestamp" property.
*
* @return A freshly generated test message.
*
* @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
*/
- public static ObjectMessage getTestMessage(Session session, Queue replyQueue, int messageSize, long currentTime,
- boolean persistent) throws JMSException
+ public ObjectMessage getTestMessage(Queue replyQueue, int messageSize, boolean persistent) throws JMSException
{
ObjectMessage msg;
if (messageSize != 0)
{
- msg = TestMessageFactory.newObjectMessage(session, messageSize);
+ msg = TestMessageFactory.newObjectMessage(_producerSession, messageSize);
}
else
{
- msg = session.createObjectMessage();
+ msg = _producerSession.createObjectMessage();
}
// Set the messages persistent delivery flag.
msg.setJMSDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
// Timestamp the message.
- msg.setLongProperty("timestamp", currentTime);
+ msg.setLongProperty("timestamp", System.currentTimeMillis());
// Ensure that the temporary reply queue is set as the reply to destination for the message.
if (replyQueue != null)
@@ -82,26 +100,6 @@
}
/**
- * Convenience method for a short pause.
- *
- * @param sleepTime The time in milliseconds to pause for.
- */
- public static void pause(long sleepTime)
- {
- if (sleepTime > 0)
- {
- try
- {
- Thread.sleep(sleepTime);
- }
- catch (InterruptedException ie)
- { }
- }
- }
-
- public abstract void pingLoop();
-
- /**
* Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
* flag has been cleared.
*/
@@ -151,18 +149,38 @@
});
}
+ public Connection getConnection()
+ {
+ return _connection;
+ }
+
+ public void setConnection(Connection connection)
+ {
+ this._connection = connection;
+ }
+
+ public Session getProducerSession()
+ {
+ return _producerSession;
+ }
+
+ public void setProducerSession(Session session)
+ {
+ this._producerSession = session;
+ }
+
/**
* Convenience method to commit the transaction on the session associated with this pinger.
*
* @throws javax.jms.JMSException If the commit fails and then the rollback fails.
*/
- protected void commitTx() throws JMSException
+ protected void commitTx(Session session) throws JMSException
{
- if (_session.getTransacted())
+ if (session.getTransacted())
{
try
{
- _session.commit();
+ session.commit();
_logger.trace("Session Commited.");
}
catch (JMSException e)
@@ -177,7 +195,7 @@
try
{
- _session.rollback();
+ session.rollback();
_logger.trace("Message rolled back.");
}
catch (JMSException jmse)
Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java?view=diff&rev=497425&r1=497424&r2=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java Thu Jan 18 05:11:39 2007
@@ -43,6 +43,8 @@
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Provide command line invocation to start the ping consumer on a configurable broker url.
* </table>
+ *
+ * @todo Add a better command line interpreter to the main method. The command line is not very nice...
*/
class TestPingClient extends AbstractPingClient implements MessageListener
{
@@ -51,20 +53,42 @@
/** Used to indicate that the reply generator should log timing info to the console (logger info level). */
private boolean _verbose = false;
+ /** The producer session. */
+ private Session _consumerSession;
+
/**
- * Creates a PingPongClient on the specified session.
+ * Creates a TestPingClient on the specified session.
+ *
+ * @param brokerDetails
+ * @param username
+ * @param password
+ * @param queueName
+ * @param virtualpath
+ * @param transacted
+ * @param selector
+ * @param verbose
*
- * @param session The JMS Session for the ping pon client to run on.
- * @param consumer The message consumer to receive the messages with.
- * @param verbose If set to <tt>true</tt> will output timing information on every message.
+ * @throws Exception All underlying exceptions allowed to fall through. This is only test code...
*/
- public TestPingClient(Session session, MessageConsumer consumer, boolean verbose) throws JMSException
+ public TestPingClient(String brokerDetails, String username, String password, String queueName, String virtualpath,
+ boolean transacted, String selector, boolean verbose) throws Exception
{
- // Hang on to the session for the replies.
- super(session);
-
- // Set this up to listen for messages on the queue.
+ // Create a connection to the broker.
+ InetAddress address = InetAddress.getLocalHost();
+ String clientID = address.getHostName() + System.currentTimeMillis();
+
+ setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath));
+
+ // Create a transactional or non-transactional session depending on the command line parameter.
+ _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+ // Connect a consumer to the ping queue and register this to be called back by it.
+ Queue q = new AMQQueue(queueName);
+ MessageConsumer consumer = _consumerSession.createConsumer(q, 1, false, false, selector);
consumer.setMessageListener(this);
+
+ // Hang on to the verbose flag setting.
+ _verbose = verbose;
}
/**
@@ -72,57 +96,32 @@
*
* @param args
*/
- public static void main(String[] args)
+ public static void main(String[] args) throws Exception
{
_logger.info("Starting...");
// Display help on the command line.
if (args.length < 4)
{
- System.out.println("Usage: brokerdetails username password virtual-path [transacted] [selector]");
+ System.out.println(
+ "Usage: brokerdetails username password virtual-path [queueName] [verbose] [transacted] [selector]");
System.exit(1);
}
- // Extract all comman line parameters.
+ // Extract all command line parameters.
String brokerDetails = args[0];
String username = args[1];
String password = args[2];
String virtualpath = args[3];
- boolean transacted = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false;
- String selector = (args.length == 6) ? args[5] : null;
-
- try
- {
- InetAddress address = InetAddress.getLocalHost();
-
- AMQConnection con1 = new AMQConnection(brokerDetails, username, password, address.getHostName(), virtualpath);
-
- _logger.info("Connected with URL:" + con1.toURL());
-
- // Create a transactional or non-transactional session depending on the command line parameter.
- Session session = null;
-
- if (transacted)
- {
- session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.SESSION_TRANSACTED);
- }
- else if (!transacted)
- {
- session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- Queue q = new AMQQueue("ping");
-
- MessageConsumer consumer = session.createConsumer(q, 1, false, false, selector);
- new TestPingClient(session, consumer, true);
-
- con1.start();
- }
- catch (Throwable t)
- {
- System.err.println("Fatal error: " + t);
- t.printStackTrace();
- }
+ String queueName = (args.length >= 5) ? args[4] : "ping";
+ boolean verbose = (args.length >= 6) ? Boolean.parseBoolean(args[5]) : true;
+ boolean transacted = (args.length >= 7) ? Boolean.parseBoolean(args[6]) : false;
+ String selector = (args.length == 8) ? args[7] : null;
+
+ // Create the test ping client and set it running.
+ TestPingClient pingClient =
+ new TestPingClient(brokerDetails, username, password, queueName, virtualpath, transacted, selector, verbose);
+ pingClient.getConnection().start();
System.out.println("Waiting...");
}
@@ -145,12 +144,12 @@
if (timestamp != null)
{
long diff = System.currentTimeMillis() - timestamp;
- _logger.info("Ping time: " + diff);
+ System.out.println("Ping time: " + diff);
}
}
// Commit the transaction if running in transactional mode.
- commitTx();
+ commitTx(_consumerSession);
}
catch (JMSException e)
{
Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java?view=diff&rev=497425&r1=497424&r2=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java Thu Jan 18 05:11:39 2007
@@ -21,6 +21,8 @@
package org.apache.qpid.ping;
import java.net.InetAddress;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import javax.jms.*;
@@ -57,49 +59,49 @@
/** Used to define how long to wait between pings. */
private static final long SLEEP_TIME = 250;
- /** Used to define how long to wait before assuming that a ping has timed out. */
- private static final long TIMEOUT = 3000;
-
/** Holds the name of the queue to send pings on. */
private static final String PING_QUEUE_NAME = "ping";
+
private static TestPingProducer _pingProducer;
/** Holds the message producer to send the pings through. */
private MessageProducer _producer;
/** Determines whether this producer sends persistent messages from the run method. */
- private boolean _persistent;
+ private boolean _persistent = false;
/** Holds the message size to send, from the run method. */
- private int _messageSize;
+ private int _messageSize = DEFAULT_MESSAGE_SIZE;
- public TestPingProducer(Session session, MessageProducer producer) throws JMSException
- {
- super(session);
- _producer = producer;
- }
+ /** Used to indicate that the ping loop should print out whenever it pings. */
+ private boolean _verbose = false;
- public TestPingProducer(Session session, MessageProducer producer, boolean persistent, int messageSize)
- throws JMSException
+ public TestPingProducer(String brokerDetails, String username, String password, String virtualpath, String queueName,
+ boolean transacted, boolean persistent, int messageSize, boolean verbose) throws Exception
{
- this(session, producer);
+ // Create a connection to the broker.
+ InetAddress address = InetAddress.getLocalHost();
+ String clientID = address.getHostName() + System.currentTimeMillis();
+
+ setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath));
+
+ // Create a transactional or non-transactional session, based on the command line arguments.
+ setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE));
+
+ // Create a queue to send the pings on.
+ Queue pingQueue = new AMQQueue(queueName);
+ _producer = (MessageProducer) getProducerSession().createProducer(pingQueue);
_persistent = persistent;
_messageSize = messageSize;
+
+ _verbose = verbose;
}
/**
- * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongClient} also needs
+ * Starts a ping-pong loop running from the command line. The bounce back client {@link TestPingClient} also needs
* to be started to bounce the pings back again.
*
- * <p/>The command line takes from 2 to 4 arguments:
- * <p/><table>
- * <tr><td>brokerDetails <td> The broker connection string.
- * <tr><td>virtualPath <td> The virtual path.
- * <tr><td>transacted <td> A boolean flag, telling this client whether or not to use transactions.
- * <tr><td>size <td> The size of ping messages to use, in bytes.
- * </table>
- *
* @param args The command line arguments as defined above.
*/
public static void main(String[] args) throws Exception
@@ -108,53 +110,33 @@
if (args.length < 2)
{
System.err.println(
- "Usage: TestPingPublisher <brokerDetails> <virtual path> [transacted] [persistent] [message size in bytes]");
+ "Usage: TestPingPublisher <brokerDetails> <virtual path> [verbose] [transacted] [persistent] [message size in bytes]");
System.exit(0);
}
String brokerDetails = args[0];
String virtualpath = args[1];
- boolean transacted = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : false;
- boolean persistent = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false;
- int messageSize = (args.length >= 5) ? Integer.parseInt(args[4]) : DEFAULT_MESSAGE_SIZE;
-
- // Create a connection to the broker.
- InetAddress address = InetAddress.getLocalHost();
- String clientID = address.getHostName() + System.currentTimeMillis();
+ boolean verbose = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : true;
+ boolean transacted = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false;
+ boolean persistent = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false;
+ int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE;
+
+ // Create a ping producer to generate the pings.
+ _pingProducer = new TestPingProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, transacted,
+ persistent, messageSize, verbose);
- Connection _connection = new AMQConnection(brokerDetails, "guest", "guest", clientID, virtualpath);
-
- // Create a transactional or non-transactional session, based on the command line arguments.
- Session session;
-
- if (transacted)
- {
- session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED);
- }
- else
- {
- session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- // Create a queue to send the pings on.
- Queue pingQueue = new AMQQueue(PING_QUEUE_NAME);
- MessageProducer producer = (MessageProducer) session.createProducer(pingQueue);
-
- // Create a ping producer to handle the request/wait/reply cycle.
- _pingProducer = new TestPingProducer(session, producer, persistent, messageSize);
-
- // Start the message consumers running.
- _connection.start();
+ // Start the connection running.
+ _pingProducer.getConnection().start();
// Create a shutdown hook to terminate the ping-pong producer.
Runtime.getRuntime().addShutdownHook(_pingProducer.getShutdownHook());
- // Start the ping loop running, ensuring that it is registered to listen for exceptions on the connection too.
- _connection.setExceptionListener(_pingProducer);
+ // Ensure the ping loop execption listener is registered on the connection to terminate it on error.
+ _pingProducer.getConnection().setExceptionListener(_pingProducer);
+
+ // Start the ping loop running until it is interrupted.
Thread pingThread = new Thread(_pingProducer);
pingThread.run();
-
- // Run until the ping loop is terminated.
pingThread.join();
}
@@ -174,16 +156,7 @@
// Commit the transaction if running in transactional mode. This must happen now, rather than at the end of
// this method, as the message will not be sent until the transaction is committed.
- commitTx();
- }
-
- /**
- * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
- * flag has been cleared.
- */
- public void stop()
- {
- _publish = false;
+ commitTx(getProducerSession());
}
/**
@@ -195,12 +168,16 @@
try
{
// Generate a sample message and time stamp it.
- ObjectMessage msg = getTestMessage(_session, null, _messageSize, System.currentTimeMillis(), _persistent);
+ ObjectMessage msg = getTestMessage(null, _messageSize, _persistent);
msg.setLongProperty("timestamp", System.currentTimeMillis());
// Send the message.
ping(msg);
+ if (_verbose)
+ {
+ System.out.println("Pinged at: " + timestampFormatter.format(new Date())); //" + " with id: " + msg.getJMSMessageID());
+ }
// Introduce a short pause if desired.
pause(SLEEP_TIME);
}
Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java?view=auto&rev=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java Thu Jan 18 05:11:39 2007
@@ -0,0 +1,283 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import java.net.InetAddress;
+import java.util.Date;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.ping.AbstractPingClient;
+
+/**
+ * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return
+ * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes
+ * too.
+ *
+ * <p/>The message id from the received message is extracted, and placed into the reply as the correlation id. Messages
+ * are bounced back to the reply-to destination. The original sender of the message has the option to use either a unique
+ * temporary queue or the correlation id to correlate the original message to the reply.
+ *
+ * <p/>There is a verbose mode flag which causes information about each ping to be output to the console
+ * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
+ * be disabled for real timing tests as writing to the console will slow things down.
+ *
+ * <p/>When the a message is received, a reply to producer is created for it if it is not the same as the previous
+ * message. All subsequent replies are sent using that producer until a different reply to destination is
+ * encountered; effectively a last used cache of size 1. Fast because it saves creating the reply producer over and
+ * over again when the destination does not change. For a larger fixed set of reply to destinations could turn this
+ * into a cache with more elements.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Bounce back messages to their reply to destination.
+ * <tr><td> Provide command line invocation to start the bounce back on a configurable broker url.
+ * </table>
+ *
+ * @todo Replace the command line parsing with a neater tool.
+ *
+ * @todo Make verbose accept a number of messages, only prints to console every X messages.
+ */
+public class PingPongBouncer extends AbstractPingClient implements MessageListener
+{
+ private static final Logger _logger = Logger.getLogger(PingPongBouncer.class);
+
+ /** The default prefetch size for the message consumer. */
+ private static final int PREFETCH = 1;
+
+ /** The default no local flag for the message consumer. */
+ private static final boolean NO_LOCAL = true;
+
+ /** The default exclusive flag for the message consumer. */
+ private static final boolean EXCLUSIVE = false;
+
+ /** Used to indicate that the reply generator should log timing info to the console (logger info level). */
+ private boolean _verbose = false;
+
+ /** Determines whether this bounce back client bounces back messages persistently. */
+ private boolean _persistent = false;
+
+ /** Keeps track of the response destination of the previous message for the last reply to producer cache. */
+ private Destination _lastResponseDest;
+
+ /** The cached, most recently used reply producer. */
+ private MessageProducer _cachedReplyProducer;
+
+ /** The consumer session. */
+ private Session _consumerSession;
+
+ /** The producer session. */
+ private Session _producerSession;
+
+ /**
+ * Creates a PingPongBouncer on the specified producer and consumer sessions.
+ *
+ * @param brokerDetails
+ * @param username
+ * @param password
+ * @param virtualpath
+ * @param queueName
+ * @param persistent
+ * @param transacted
+ * @param selector
+ * @param verbose
+ * @throws JMSException
+ *
+ * @throws Exception All underlying exceptions allowed to fall through. This is only test code...
+ */
+ public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath, String queueName,
+ boolean persistent, boolean transacted, String selector, boolean verbose) throws Exception
+ {
+ // Create a client id to uniquely identify this client.
+ InetAddress address = InetAddress.getLocalHost();
+ String clientId = address.getHostName() + System.currentTimeMillis();
+
+ // Connect to the broker.
+ setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath));
+ _logger.info("Connected with URL:" + getConnection().toURL());
+
+ // Set up the failover notifier.
+ getConnection().setConnectionListener(new FailoverNotifier());
+
+ // Create a session to listen for messages on and one to send replies on, transactional depending on the
+ // command line option.
+ Session consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ Session producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+ // Create the queue to listen for message on.
+ Queue q = new AMQQueue(queueName);
+ MessageConsumer consumer = consumerSession.createConsumer(q, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
+
+ // Hang on to the sessions for the messages and replies.
+ _consumerSession = consumerSession;
+ _producerSession = producerSession;
+
+ _verbose = verbose;
+ _persistent = persistent;
+
+ // Set this up to listen for messages on the queue.
+ consumer.setMessageListener(this);
+ }
+
+ /**
+ * Starts a stand alone ping-pong client running in verbose mode.
+ *
+ * @param args
+ */
+ public static void main(String[] args) throws Exception
+ {
+ System.out.println("Starting...");
+
+ // Display help on the command line.
+ if (args.length < 5)
+ {
+ System.err.println("Usage: <brokerdetails> <username> <password> <virtual-path> <serviceQueue> "
+ + "[<P[ersistent]|N[onPersistent]> <T[ransacted]|N<onTransacted]>] [selector]");
+ System.exit(1);
+ }
+
+ // Extract all command line parameters.
+ String brokerDetails = args[0];
+ String username = args[1];
+ String password = args[2];
+ String virtualpath = args[3];
+ String queueName = args[4];
+ boolean persistent = ((args.length >= 6) && (args[5].toUpperCase().charAt(0) == 'P'));
+ boolean transacted = ((args.length >= 7) && (args[6].toUpperCase().charAt(0) == 'T'));
+ String selector = (args.length == 8) ? args[5] : null;
+
+ // Instantiate the ping pong client with the command line options and start it running.
+ PingPongBouncer pingBouncer =
+ new PingPongBouncer(brokerDetails, username, password, virtualpath, queueName, persistent, transacted, selector,
+ true);
+ pingBouncer.getConnection().start();
+
+ System.out.println("Waiting...");
+ }
+
+ /**
+ * This is a callback method that is notified of all messages for which this has been registered as a message
+ * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to
+ * destination of the message.
+ *
+ * @param message The message that triggered this callback.
+ */
+ public void onMessage(Message message)
+ {
+ try
+ {
+ String messageCorrelationId = message.getJMSCorrelationID();
+
+ if (_verbose)
+ {
+ _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, "
+ + messageCorrelationId);
+ }
+
+ // Get the reply to destination from the message and check it is set.
+ Destination responseDest = message.getJMSReplyTo();
+
+ if (responseDest == null)
+ {
+ _logger.debug("Producer not created because the response destination is null.");
+
+ return;
+ }
+
+ // Check if the reply to destination is different to the last message and create a new producer if so.
+ if (!responseDest.equals(_lastResponseDest))
+ {
+ _lastResponseDest = responseDest;
+
+ _logger.debug("About to create a producer.");
+ _cachedReplyProducer = _producerSession.createProducer(responseDest);
+ _cachedReplyProducer.setDisableMessageTimestamp(true);
+ _cachedReplyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ _logger.debug("After create a producer.");
+ }
+
+ // Spew out some timing information if verbose mode is on.
+ if (_verbose)
+ {
+ Long timestamp = message.getLongProperty("timestamp");
+
+ if (timestamp != null)
+ {
+ long diff = System.currentTimeMillis() - timestamp;
+ _logger.info("Time to bounce point: " + diff);
+ }
+ }
+
+ // Correlate the reply to the original.
+ message.setJMSCorrelationID(messageCorrelationId);
+
+ // Send the receieved message as the pong reply.
+ _cachedReplyProducer.send(message);
+
+ if (_verbose)
+ {
+ _logger.info(timestampFormatter.format(new Date()) + ": Sent reply with correlation id, "
+ + messageCorrelationId);
+ }
+
+ // Commit the transaction if running in transactional mode.
+ commitTx(_producerSession);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * A connection listener that logs out any failover complete events. Could do more interesting things with this
+ * at some point...
+ */
+ public static class FailoverNotifier implements ConnectionListener
+ {
+ public void bytesSent(long count)
+ { }
+
+ public void bytesReceived(long count)
+ { }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _logger.info("App got failover complete callback.");
+ }
+ }
+}
Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=497425&r1=497424&r2=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Thu Jan 18 05:11:39 2007
@@ -20,22 +20,28 @@
*/
package org.apache.qpid.requestreply;
+import java.net.InetAddress;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.*;
+
import org.apache.log4j.Logger;
+
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
import org.apache.qpid.ping.AbstractPingProducer;
-import org.apache.qpid.util.concurrent.BooleanLatch;
-
-import javax.jms.*;
-import java.net.InetAddress;
-import java.util.HashMap;
-import java.util.Map;
/**
* PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back
- * client (see {@link org.apache.qpid.requestreply.PingPongClient} for the bounce back client). It is designed to be run from the command line
+ * client (see {@link PingPongBouncer} for the bounce back client). It is designed to be run from the command line
* as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session,
* message producer and message consumer to run the ping-pong cycle on.
*
@@ -75,8 +81,20 @@
/** Holds the name of the queue to send pings on. */
private static final String PING_QUEUE_NAME = "ping";
+ /** The batch size. */
+ private static final int BATCH_SIZE = 100;
+
/** Keeps track of the ping producer instance used in the run loop. */
private static PingPongProducer _pingProducer;
+ private static final int PREFETCH = 100;
+ private static final boolean NO_LOCAL = true;
+ private static final boolean EXCLUSIVE = false;
+
+ /** The number of priming loops to run. */
+ private static final int PRIMING_LOOPS = 3;
+
+ /** A source for providing sequential unique correlation ids. */
+ private AtomicLong idGenerator = new AtomicLong(0L);
/** Holds the message producer to send the pings through. */
private MessageProducer _producer;
@@ -91,32 +109,65 @@
private int _messageSize;
/** Holds a map from message ids to latches on which threads wait for replies. */
- private Map<String, BooleanLatch> trafficLights = new HashMap<String, BooleanLatch>();
+ private Map<String, CountDownLatch> trafficLights = new HashMap<String, CountDownLatch>();
+
+ /** Used to indicate that the ping loop should print out whenever it pings. */
+ private boolean _verbose = false;
- /** Holds a map from message ids to correlated replies. */
- private Map<String, Message> replies = new HashMap<String, Message>();
+ private Session _consumerSession;
- public PingPongProducer(Session session, Queue replyQueue, MessageProducer producer, MessageConsumer consumer)
- throws JMSException
+ /**
+ * Creates a ping pong producer with the specified connection details and type.
+ *
+ * @param brokerDetails
+ * @param username
+ * @param password
+ * @param virtualpath
+ * @param transacted
+ * @param persistent
+ * @param messageSize
+ * @param verbose
+ *
+ * @throws Exception All allowed to fall through. This is only test code...
+ */
+ public PingPongProducer(String brokerDetails, String username, String password, String virtualpath, String queueName,
+ String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose)
+ throws Exception
{
- super(session);
- _producer = producer;
- _replyQueue = replyQueue;
+ // Create a connection to the broker.
+ InetAddress address = InetAddress.getLocalHost();
+ String clientID = address.getHostName() + System.currentTimeMillis();
+
+ setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath));
+ // Create transactional or non-transactional sessions, based on the command line arguments.
+ setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE));
+ _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+ // Create a queue and producer to send the pings on.
+ Queue pingQueue = new AMQQueue(queueName);
+ _producer = (MessageProducer) getProducerSession().createProducer(pingQueue);
+ _producer.setDisableMessageTimestamp(true);
+ _producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ // Create a temporary queue to get the pongs on.
+ _replyQueue = _consumerSession.createTemporaryQueue();
+
+ // Create a message consumer to get the replies with and register this to be called back by it.
+ MessageConsumer consumer = _consumerSession.createConsumer(_replyQueue, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
consumer.setMessageListener(this);
- }
- public PingPongProducer(Session session, Queue replyQueue, MessageProducer producer, MessageConsumer consumer,
- boolean persistent, int messageSize) throws JMSException
- {
- this(session, replyQueue, producer, consumer);
+ // Run a few priming pings to remove warm up time from test results.
+ prime(PRIMING_LOOPS);
_persistent = persistent;
_messageSize = messageSize;
+
+ _verbose = verbose;
}
/**
- * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongClient} also needs
+ * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongBouncer} also needs
* to be started to bounce the pings back again.
*
* <p/>The command line takes from 2 to 4 arguments:
@@ -141,59 +192,59 @@
String brokerDetails = args[0];
String virtualpath = args[1];
- boolean transacted = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : false;
- boolean persistent = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false;
- int messageSize = (args.length >= 5) ? Integer.parseInt(args[4]) : DEFAULT_MESSAGE_SIZE;
-
- // Create a connection to the broker.
- InetAddress address = InetAddress.getLocalHost();
- String clientID = address.getHostName() + System.currentTimeMillis();
-
- Connection _connection = new AMQConnection(brokerDetails, "guest", "guest", clientID, virtualpath);
-
- // Create a transactional or non-transactional session, based on the command line arguments.
- Session session;
-
- if (transacted)
- {
- session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED);
- }
- else
- {
- session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- // Create a queue to send the pings on.
- Queue pingQueue = new AMQQueue(PING_QUEUE_NAME);
- MessageProducer producer = (MessageProducer) session.createProducer(pingQueue);
-
- // Create a temporary queue to reply with the pongs on.
- Queue replyQueue = session.createTemporaryQueue();
-
- // Create a message consumer to get the replies with.
- MessageConsumer consumer = session.createConsumer(replyQueue);
+ boolean verbose = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : true;
+ boolean transacted = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false;
+ boolean persistent = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false;
+ int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE;
// Create a ping producer to handle the request/wait/reply cycle.
- _pingProducer = new PingPongProducer(session, replyQueue, producer, consumer, persistent, messageSize);
-
- // Start the message consumers running.
- _connection.start();
+ _pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, null, transacted,
+ persistent, messageSize, verbose);
+ _pingProducer.getConnection().start();
// Create a shutdown hook to terminate the ping-pong producer.
Runtime.getRuntime().addShutdownHook(_pingProducer.getShutdownHook());
- // Start the ping loop running, ensuring that it is registered to listen for exceptions on the connection too.
- _connection.setExceptionListener(_pingProducer);
+ // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
+ _pingProducer.getConnection().setExceptionListener(_pingProducer);
+
+ // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
Thread pingThread = new Thread(_pingProducer);
pingThread.run();
-
- // Run until the ping loop is terminated.
pingThread.join();
}
/**
+ * Primes the test loop by sending a few messages, then introducing a short wait. This allows the bounce back client
+ * on the other end a chance to configure its reply producer on the reply to destination. It is also worth calling
+ * this a few times, in order to prime the JVMs JIT compilation.
+ *
+ * @param x The number of priming loops to run.
+ *
+ * @throws JMSException All underlying exceptions are allowed to fall through.
+ */
+ public void prime(int x) throws JMSException
+ {
+ for (int i = 0; i < x; i++)
+ {
+ // Create and send a small message.
+ Message first = getTestMessage(_replyQueue, 0, false);
+ _producer.send(first);
+ commitTx(getProducerSession());
+
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException ignore)
+ { }
+ }
+ }
+
+ /**
* Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a
- * correlating reply may be waiting on.
+ * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected
+ * in the replies map.
*
* @param message The received message.
*/
@@ -201,21 +252,37 @@
{
try
{
- // Store the reply.
+ // Store the reply, if it has a correlation id that is expected.
String correlationID = message.getJMSCorrelationID();
- replies.put(correlationID, message);
+
+ if (_verbose)
+ {
+ _logger.info(timestampFormatter.format(new Date()) + ": Got reply with correlation id, " + correlationID);
+ }
// Turn the traffic light to green.
- BooleanLatch trafficLight = trafficLights.get(correlationID);
+ CountDownLatch trafficLight = trafficLights.get(correlationID);
if (trafficLight != null)
{
- trafficLight.signal();
+ _logger.debug("Reply was expected, decrementing the latch for the id.");
+ trafficLight.countDown();
}
else
{
_logger.debug("There was no thread waiting for reply: " + correlationID);
}
+
+ if (_verbose)
+ {
+ Long timestamp = message.getLongProperty("timestamp");
+
+ if (timestamp != null)
+ {
+ long diff = System.currentTimeMillis() - timestamp;
+ _logger.info("Time for round trip: " + diff);
+ }
+ }
}
catch (JMSException e)
{
@@ -224,50 +291,90 @@
}
/**
- * Sends the specified ping message and then waits for a correlating reply. If the wait times out before a reply
- * arrives, then a null reply is returned from this method.
+ * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
+ * before a reply arrives, then a null reply is returned from this method.
*
- * @param message The message to send.
- * @param timeout The timeout in milliseconds.
+ * @param message The message to send.
+ * @param numPings The number of ping messages to send.
+ * @param timeout The timeout in milliseconds.
*
- * @return The reply, or null if no reply arrives before the timeout.
+ * @return The number of replies received. This may be less than the number sent if the timeout terminated the
+ * wait for all prematurely.
*
* @throws JMSException All underlying JMSExceptions are allowed to fall through.
*/
- public Message pingAndWaitForReply(Message message, long timeout) throws JMSException, InterruptedException
+ public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
{
- _producer.send(message);
+ // Put a unique correlation id on the message before sending it.
+ String messageCorrelationId = Long.toString(idGenerator.incrementAndGet());
+ message.setJMSCorrelationID(messageCorrelationId);
- // Keep the messageId to correlate with the reply.
- String messageId = message.getJMSMessageID();
+ for (int i = 0; i < numPings; i++)
+ {
+ // Re-timestamp the message.
+ message.setLongProperty("timestamp", System.currentTimeMillis());
+
+ _producer.send(message);
+ }
// Commit the transaction if running in transactional mode. This must happen now, rather than at the end of
// this method, as the message will not be sent until the transaction is committed.
- commitTx();
+ commitTx(getProducerSession());
+
+ // Keep the messageId to correlate with the reply.
+ //String messageId = message.getJMSMessageID();
+
+ if (_verbose)
+ {
+ _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
+ }
// Block the current thread until a reply to the message is received, or it times out.
- BooleanLatch trafficLight = new BooleanLatch();
- trafficLights.put(messageId, trafficLight);
+ CountDownLatch trafficLight = new CountDownLatch(numPings);
+ trafficLights.put(messageCorrelationId, trafficLight);
// Note that this call expects a timeout in nanoseconds, millisecond timeout is multiplied up.
- trafficLight.await(timeout * 1000);
+ trafficLight.await(timeout, TimeUnit.MILLISECONDS);
- // Check the replies to see if one was generated, if not then the reply timed out.
- Message result = replies.get(messageId);
+ // Work out how many replies were receieved.
+ int numReplies = numPings - (int) trafficLight.getCount();
- return result;
+ if ((numReplies < numPings) && _verbose)
+ {
+ _logger.info("Timed out before all replies received on id, " + messageCorrelationId);
+ }
+ else if (_verbose)
+ {
+ _logger.info("Got all replies on id, " + messageCorrelationId);
+ }
+
+ return numReplies;
}
/**
- * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
- * connection, this clears the publish flag which in turn will halt the ping loop.
+ * Sends the specified ping message but does not wait for a correlating reply.
+ *
+ * @param message The message to send.
+ * @param numPings The number of pings to send.
*
- * @param e The exception that triggered this callback method.
+ * @return The reply, or null if no reply arrives before the timeout.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
*/
- public void onException(JMSException e)
+ public void pingNoWaitForReply(Message message, int numPings) throws JMSException, InterruptedException
{
- _publish = false;
- _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ for (int i = 0; i < numPings; i++)
+ {
+ _producer.send(message);
+
+ if (_verbose)
+ {
+ _logger.info(timestampFormatter.format(new Date()) + ": Pinged at.");
+ }
+ }
+
+ // Commit the transaction if running in transactional mode, to force the send now.
+ commitTx(getProducerSession());
}
/**
@@ -279,11 +386,11 @@
try
{
// Generate a sample message and time stamp it.
- ObjectMessage msg = getTestMessage(_session, _replyQueue, _messageSize, System.currentTimeMillis(), _persistent);
+ ObjectMessage msg = getTestMessage(_replyQueue, _messageSize, _persistent);
msg.setLongProperty("timestamp", System.currentTimeMillis());
// Send the message and wait for a reply.
- pingAndWaitForReply(msg, TIMEOUT);
+ pingAndWaitForReply(msg, BATCH_SIZE, TIMEOUT);
// Introduce a short pause if desired.
pause(SLEEP_TIME);
@@ -297,6 +404,39 @@
{
_publish = false;
_logger.debug("There was an interruption: " + e.getMessage(), e);
+ }
+ }
+
+ public Queue getReplyQueue()
+ {
+ return _replyQueue;
+ }
+
+ /**
+ * A connection listener that logs out any failover complete events. Could do more interesting things with this
+ * at some point...
+ */
+ public static class FailoverNotifier implements ConnectionListener
+ {
+ public void bytesSent(long count)
+ { }
+
+ public void bytesReceived(long count)
+ { }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _logger.info("App got failover complete callback.");
}
}
}
Added: incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java?view=auto&rev=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java Thu Jan 18 05:11:39 2007
@@ -0,0 +1,180 @@
+package org.apache.qpid.ping;
+
+import java.net.InetAddress;
+import java.util.Properties;
+
+import javax.jms.*;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.jms.Connection;
+import org.apache.qpid.jms.MessageProducer;
+import org.apache.qpid.jms.Session;
+
+/**
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ *
+ * @author Rupert Smith
+ */
+public class PingTestPerf extends TestCase //implements TimingControllerAware
+{
+ private static Logger _logger = Logger.getLogger(PingTestPerf.class);
+
+ /** Holds the name of the property to get the test message size from. */
+ private static final String MESSAGE_SIZE_PROPNAME = "messageSize";
+
+ /** Holds the name of the property to get the ping queue name from. */
+ private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
+
+ /** Holds the name of the property to get the test delivery mode from. */
+ private static final String PERSISTENT_MODE_PROPNAME = "persistent";
+
+ /** Holds the name of the property to get the test transactional mode from. */
+ private static final String TRANSACTED_PROPNAME = "transacted";
+
+ /** Holds the name of the property to get the test broker url from. */
+ private static final String BROKER_PROPNAME = "broker";
+
+ /** Holds the name of the property to get the test broker virtual path. */
+ private static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
+
+ /** Holds the size of message body to attach to the ping messages. */
+ private static final int MESSAGE_SIZE_DEFAULT = 0;
+
+ /** Holds the name of the queue to which pings are sent. */
+ private static final String PING_QUEUE_NAME_DEFAULT = "ping";
+
+ /** Holds the message delivery mode to use for the test. */
+ private static final boolean PERSISTENT_MODE_DEFAULT = false;
+
+ /** Holds the transactional mode to use for the test. */
+ private static final boolean TRANSACTED_DEFAULT = false;
+
+ /** Holds the default broker url for the test. */
+ private static final String BROKER_DEFAULT = "tcp://localhost:5672";
+
+ /** Holds the default virtual path for the test. */
+ private static final String VIRTUAL_PATH_DEFAULT = "/test";
+
+ /** Sets a default ping timeout. */
+ private static final long TIMEOUT = 3000;
+
+ // Sets up the test parameters with defaults.
+ static
+ {
+ setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
+ setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
+ setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
+ setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT));
+ setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
+ setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
+ }
+
+ /** Holds the test ping-pong producer. */
+ private TestPingProducer _testPingProducer;
+
+ /** Holds the test ping client. */
+ private TestPingClient _testPingClient;
+
+ // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
+ // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
+ // of the test parameters to log with the results.
+ private Properties testParameters = System.getProperties();
+ //private Properties testParameters = new ContextualProperties(System.getProperties());
+
+ public PingTestPerf(String name)
+ {
+ super(name);
+ }
+
+ private static void setSystemPropertyIfNull(String propName, String propValue)
+ {
+ if (System.getProperty(propName) == null)
+ {
+ System.setProperty(propName, propValue);
+ }
+ }
+
+ public void testPingOk() throws Exception
+ {
+ // Generate a sample message. This message is already time stamped and has its reply-to destination set.
+ ObjectMessage msg =
+ _testPingProducer.getTestMessage(null, Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)),
+ Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)));
+
+ // Use the test timing controller to reset the test timer now and obtain the current time.
+ // This can be used to remove the message creation time from the test.
+ //TestTimingController timingUtils = getTimingController();
+ //long startTime = timingUtils.restart();
+
+ // Send the message.
+ _testPingProducer.ping(msg);
+
+ // Fail the test if the timeout was exceeded.
+ /*if (reply == null)
+ {
+ Assert.fail("The ping timed out for message id: " + msg.getJMSMessageID());
+ }*/
+ }
+
+ protected void setUp() throws Exception
+ {
+ // Log4j will propagate the test name as a thread local in all log output.
+ NDC.push(getName());
+
+ // Ensure that the connection, session and ping queue are established, if they have not already been.
+ if (_testPingProducer == null)
+ {
+ // Extract the test set up paramaeters.
+ String brokerDetails = testParameters.getProperty(BROKER_PROPNAME);
+ String username = "guest";
+ String password = "guest";
+ String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
+ String queueName = testParameters.getProperty(PING_QUEUE_NAME_PROPNAME);
+ boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
+ boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
+ String selector = null;
+ boolean verbose = false;
+ int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
+
+ // Establish a bounce back client on the ping queue to bounce back the pings.
+ _testPingClient = new TestPingClient(brokerDetails, username, password, queueName, virtualpath, transacted,
+ selector, verbose);
+
+ // Establish a ping-pong client on the ping queue to send the pings with.
+ _testPingProducer = new TestPingProducer(brokerDetails, username, password, virtualpath, queueName, transacted,
+ persistent, messageSize, verbose);
+
+ // Start the connections for client and producer running.
+ _testPingClient.getConnection().start();
+ _testPingProducer.getConnection().start();
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ if ((_testPingClient != null) && (_testPingClient.getConnection() != null))
+ {
+ _testPingClient.getConnection().close();
+ }
+
+ if ((_testPingProducer != null) && (_testPingProducer.getConnection() != null))
+ {
+ _testPingProducer.getConnection().close();
+ }
+ }
+ finally
+ {
+ NDC.pop();
+ }
+ }
+}
Modified: incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java?view=diff&rev=497425&r1=497424&r2=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java Thu Jan 18 05:11:39 2007
@@ -16,6 +16,7 @@
import org.apache.qpid.jms.Connection;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
+import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
/**
* PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run
@@ -42,7 +43,7 @@
*
* @author Rupert Smith
*/
-public class PingPongTestPerf extends TestCase implements ExceptionListener //, TimingControllerAware
+public class PingPongTestPerf extends TestCase //implements TimingControllerAware
{
private static Logger _logger = Logger.getLogger(PingPongTestPerf.class);
@@ -99,36 +100,15 @@
/** Holds the test ping-pong producer. */
private PingPongProducer _testPingProducer;
+ /** Holds the test ping client. */
+ private PingPongBouncer _testPingBouncer;
+
// Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
// the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
- // of the test parameters to log with the results.
+ // of the test parameters to log with the results. It also providers some basic type parsing convenience methods.
private Properties testParameters = System.getProperties();
//private Properties testParameters = new ContextualProperties(System.getProperties());
- /** Holds the connection to the broker. */
- private Connection _connection = null;
-
- /** Holds the current session to the broker. */
- private Session _session;
-
- /** Holds the destination to send the ping messages to. */
- private Queue _pingQueue;
-
- /** Holds the destination to send replies to. */
- private Queue _replyQueue;
-
- /** Holds a message producer, set up on the ping destination, to send messages through. */
- private MessageProducer _producer;
-
- /** Holds a message consumer, set up on the ping destination, to receive pings through. */
- private MessageConsumer _pingConsumer;
-
- /** Holds a message consumer, set up on the pong destination, to receive replies through. */
- private MessageConsumer _pongConsumer;
-
- /** Holds a failure flag, which gets set if the connection to the broker goes down. */
- private boolean _failure;
-
public PingPongTestPerf(String name)
{
super(name);
@@ -146,9 +126,8 @@
{
// Generate a sample message. This message is already time stamped and has its reply-to destination set.
ObjectMessage msg =
- PingPongProducer.getTestMessage(_session, _replyQueue,
+ _testPingProducer.getTestMessage(_testPingProducer.getReplyQueue(),
Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)),
- System.currentTimeMillis(),
Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)));
// Use the test timing controller to reset the test timer now and obtain the current time.
@@ -157,77 +136,46 @@
//long startTime = timingUtils.restart();
// Send the message and wait for a reply.
- Message reply = _testPingProducer.pingAndWaitForReply(msg, TIMEOUT);
+ int numReplies = _testPingProducer.pingAndWaitForReply(msg, 1, TIMEOUT);
// Fail the test if the timeout was exceeded.
- if (reply == null)
+ if (numReplies != 1)
{
Assert.fail("The ping timed out for message id: " + msg.getJMSMessageID());
}
}
- /**
- * This is a callback method that is registered to receive any JMSExceptions that occurr on the connection to
- * the broker. It sets a failure flag to indicate that there is an error condition.
- *
- * @param e The JMSException that triggered this callback method.
- *
- * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
- */
- public void onException(JMSException e)
- {
- // Set the failure flag.
- _failure = true;
-
- _logger.debug("There was a JMSException: " + e.getMessage(), e);
- }
-
protected void setUp() throws Exception
{
// Log4j will propagate the test name as a thread local in all log output.
NDC.push(getName());
// Ensure that the connection, session and ping queue are established, if they have not already been.
- if (_connection == null)
+ if (_testPingProducer == null)
{
- // Create a client id that identifies the client machine.
- String clientID = InetAddress.getLocalHost().getHostName() + System.currentTimeMillis();
-
- // Connect to the broker.
- _connection = new AMQConnection(testParameters.getProperty(BROKER_PROPNAME), "guest", "guest", clientID,
- testParameters.getProperty(VIRTUAL_PATH_PROPNAME));
- _connection.setExceptionListener(this);
-
- // Create a transactional or non-transactional session, based on the test properties, if a session has not
- // already been created.
- if (Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)))
- {
- _session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED);
- }
- else
- {
- _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- // Create a queue to send the pings on.
- _pingQueue = new AMQQueue(testParameters.getProperty(PING_QUEUE_NAME_PROPNAME));
- _producer = (MessageProducer) _session.createProducer(_pingQueue);
-
- // Create a temporary queue to reply with the pongs on.
- _replyQueue = _session.createTemporaryQueue();
-
- // Create the ping and pong consumers on their respective destinations.
- _pingConsumer = _session.createConsumer(_pingQueue);
- _pongConsumer = _session.createConsumer(_replyQueue);
+ // Extract the test set up paramaeters.
+ String brokerDetails = testParameters.getProperty(BROKER_PROPNAME);
+ String username = "guest";
+ String password = "guest";
+ String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
+ String queueName = testParameters.getProperty(PING_QUEUE_NAME_PROPNAME);
+ boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
+ boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
+ String selector = null;
+ boolean verbose = false;
+ int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
// Establish a bounce back client on the ping queue to bounce back the pings.
- new org.apache.qpid.requestreply.PingPongClient(_session, _pingConsumer, false);
+ _testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualpath, queueName, persistent,
+ transacted, selector, verbose);
- // Establish a ping-pong client on the ping queue to send pings and wait for replies.
- _testPingProducer = new org.apache.qpid.requestreply.PingPongProducer(_session, _replyQueue, _producer,
- _pongConsumer);
-
- _connection.start();
+ // Establish a ping-pong client on the ping queue to send the pings with.
+ _testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualpath, queueName, selector,
+ transacted, persistent, messageSize, verbose);
+
+ // Start the connections for client and producer running.
+ _testPingBouncer.getConnection().start();
+ _testPingProducer.getConnection().start();
}
}
@@ -235,7 +183,15 @@
{
try
{
- _connection.close();
+ if ((_testPingBouncer != null) && (_testPingBouncer.getConnection() != null))
+ {
+ _testPingBouncer.getConnection().close();
+ }
+
+ if ((_testPingProducer != null) && (_testPingProducer.getConnection() != null))
+ {
+ _testPingProducer.getConnection().close();
+ }
}
finally
{
Modified: incubator/qpid/trunk/qpid/java/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/pom.xml?view=diff&rev=497425&r1=497424&r2=497425
==============================================================================
--- incubator/qpid/trunk/qpid/java/pom.xml (original)
+++ incubator/qpid/trunk/qpid/java/pom.xml Thu Jan 18 05:11:39 2007
@@ -99,7 +99,7 @@
<!--Versions for various plugins and features -->
<antrun.version>1.2-SNAPSHOT</antrun.version>
- <assembly.version>2.1</assembly.version>
+ <assembly.version>2.2-SNAPSHOT</assembly.version>
<cobertura.version>2.0</cobertura.version>
<compiler.version>2.0.1</compiler.version>
<dependency.plugin.version>1.0</dependency.plugin.version>
@@ -284,6 +284,8 @@
<property>
<name>amqj.logging.level</name>
<value>${amqj.logging.level}</value>
+ <!--<name>log4j.configuration</name>
+ <value>file:/${topDirectoryLocation}/etc/log4j.xml</value>-->
</property>
</systemproperties>
</configuration>