You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/01/26 09:52:25 UTC

svn commit: r500188 [4/5] - in /incubator/qpid/branches/perftesting/qpid/java: ./ broker/ broker/distribution/ broker/distribution/src/ broker/distribution/src/main/ broker/distribution/src/main/assembly/ client/ client/distribution/ client/distributio...

Added: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java?view=auto&rev=500188
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java Fri Jan 26 00:52:20 2007
@@ -0,0 +1,134 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.pingpong;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.Level;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.jms.Session;
+
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Topic;
+import javax.jms.JMSException;
+import java.net.InetAddress;
+
+public class TestPingSubscriber
+{
+    private static final Logger _logger = Logger.getLogger(TestPingSubscriber.class);
+
+    private static class TestPingMessageListener implements MessageListener
+    {
+        public TestPingMessageListener()
+        {
+        }
+
+        long _lastTimestamp = 0L;
+        long _lastTimestampString = 0L;
+
+        public void onMessage(javax.jms.Message message)
+        {
+            Long time = System.nanoTime();
+
+            if (_logger.isInfoEnabled())
+            {
+                long timestampString = 0L;
+
+                try
+                {
+                    long timestamp = message.getLongProperty("timestamp");
+                    timestampString = Long.parseLong(message.getStringProperty("timestampString"));
+
+                    if (timestampString != timestamp)
+                    {
+                        _logger.info("Timetamps differ!:\n" +
+                                     "timestamp:" + timestamp + "\n" +
+                                     "timestampString:" + timestampString);
+                    }
+
+                }
+                catch (JMSException jmse)
+                {
+                    _logger.error("JMSException caught:" + jmse.getMessage(), jmse);
+                }
+
+
+                long stringDiff = time - timestampString;
+
+                _logger.info("Ping: TS:" + stringDiff / 1000 + "us");
+
+                // _logger.info(_name + " got message '" + message + "\n");
+            }
+        }
+    }
+
+    public static void main(String[] args)
+    {
+        _logger.info("Starting...");
+
+        if (args.length < 4)
+        {
+            System.out.println("Usage: brokerdetails username password virtual-path [selector] ");
+            System.exit(1);
+        }
+        try
+        {
+            InetAddress address = InetAddress.getLocalHost();
+            AMQConnection con1 = new AMQConnection(args[0], args[1], args[2],
+                                                   address.getHostName(), args[3]);
+
+            _logger.info("Connected with URL:" + con1.toURL());
+
+            final org.apache.qpid.jms.Session session1 = (org.apache.qpid.jms.Session)
+                    con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+            String selector = null;
+
+            if (args.length == 5)
+            {
+                selector = args[4];
+                _logger.info("Message selector is <" + selector + ">...");
+            }
+            else
+            {
+                _logger.info("Not using message selector ");
+            }
+
+            Topic t = new AMQTopic("ping");
+
+            MessageConsumer consumer1 = session1.createConsumer(t,
+                                                                1, false, false, selector);
+
+            consumer1.setMessageListener(new TestPingMessageListener());
+            con1.start();
+        }
+        catch (Throwable t)
+        {
+            System.err.println("Fatal error: " + t);
+            t.printStackTrace();
+        }
+
+        System.out.println("Waiting...");
+    }
+}
+

Propchange: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java?view=auto&rev=500188
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java Fri Jan 26 00:52:20 2007
@@ -0,0 +1,67 @@
+package org.apache.qpid.ping;
+
+/**
+ * Throttle is a helper class used in situations where a controlled rate of processing is desired. It allows a certain
+ * number of operations-per-second to be defined and supplies a {@link #throttle} method that can only be called at
+ * most at that rate. The first call to the throttle method will return immediately, subsequent calls will introduce
+ * a short pause to fill out the remainder of the current cycle to attain the desired rate. If there is no remainder
+ * left then it will return immediately.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ *
+ * @author Rupert Smith
+ */
+public class Throttle
+{
+    /** Holds the length of the cycle in nano seconds. */
+    long cycleLengthNanos = 0L;
+
+    /** Records the nano time of the last call to the throttle method. */
+    long lastTimeNanos = 0L;
+
+    /**
+     * Sets up the desired rate of operation per second that the throttle method should restrict to.
+     *
+     * @param opsPerSecond The maximum number of calls per second that the throttle method will take.
+     */
+    public void setRate(int opsPerSecond)
+    {
+        // Calculate the length of a cycle.
+        cycleLengthNanos = 1000000000 / opsPerSecond;
+    }
+
+    /**
+     * Introduces a short pause to fill out any time left in the cycle since this method was last called, of length
+     * defined by a call to the {@link #setRate} method.
+     */
+    public void throttle()
+    {
+        // Record the time now.
+        long currentTimeNanos = System.nanoTime();
+
+        // Check if there is any time remaining in the current cycle and introduce a short wait to fill out the
+        // remainder of the cycle if needed.
+        long remainingTimeNanos = cycleLengthNanos - (currentTimeNanos - lastTimeNanos);
+
+        if (remainingTimeNanos > 0)
+        {
+            long milliWait = remainingTimeNanos / 1000000;
+            int nanoWait = (int) (remainingTimeNanos % 1000000);
+
+            try
+            {
+                Thread.currentThread().sleep(milliWait, nanoWait);
+            }
+            catch (InterruptedException e)
+            {
+                // Just ignore this?
+            }
+        }
+
+        // Keep the time of the last call to this method to calculate the next cycle.
+        //lastTimeNanos = currentTimeNanos;
+        lastTimeNanos = System.nanoTime();
+    }
+}

Propchange: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java?view=auto&rev=500188
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java Fri Jan 26 00:52:20 2007
@@ -0,0 +1,300 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import java.net.InetAddress;
+import java.util.Date;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.ping.AbstractPingClient;
+import org.apache.qpid.topic.Config;
+
+/**
+ * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return
+ * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes
+ * too.
+ *
+ * <p/>The correlation id from the received message is extracted, and placed into the reply as the correlation id. Messages
+ * are bounced back to their reply-to destination. The original sender of the message has the option to use either a unique
+ * temporary queue or the correlation id to correlate the original message to the reply.
+ *
+ * <p/>There is a verbose mode flag which causes information about each ping to be output to the console
+ * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
+ * be disabled for real timing tests as writing to the console will slow things down.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Bounce back messages to their reply to destination.
+ * <tr><td> Provide command line invocation to start the bounce back on a configurable broker url.
+ * </table>
+ *
+ * @todo Replace the command line parsing with a neater tool.
+ *
+ * @todo Make verbose accept a number of messages, only prints to console every X messages.
+ */
+public class PingPongBouncer extends AbstractPingClient implements MessageListener
+{
+    private static final Logger _logger = Logger.getLogger(PingPongBouncer.class);
+
+    /** The default prefetch size for the message consumer. */
+    private static final int PREFETCH = 1;
+
+    /** The default no local flag for the message consumer. */
+    private static final boolean NO_LOCAL = true;
+
+    private static final String DEFAULT_DESTINATION_NAME = "ping";
+
+    /** The default exclusive flag for the message consumer. */
+    private static final boolean EXCLUSIVE = false;
+
+    /** Used to indicate that the reply generator should log timing info to the console (logger info level). */
+    private boolean _verbose = false;
+
+    /** Determines whether this bounce back client bounces back messages persistently. */
+    private boolean _persistent = false;
+
+    private Destination _consumerDestination;
+
+    /** Keeps track of the response destination of the previous message for the last reply to producer cache. */
+    private Destination _lastResponseDest;
+
+    /** The producer for sending replies with. */
+    private MessageProducer _replyProducer;
+
+    /** The consumer session. */
+    private Session _consumerSession;
+
+    /** The producer session. */
+    private Session _producerSession;
+
+    /**
+     * Creates a PingPongBouncer on the specified producer and consumer sessions.
+     *
+     * @param brokerDetails The addresses of the brokers to connect to.
+     * @param username        The broker username.
+     * @param password        The broker password.
+     * @param virtualpath     The virtual host name within the broker.
+     * @param destinationName The name of the queue to receive pings on
+     *                        (or root of the queue name where many queues are generated).
+     * @param persistent      A flag to indicate that persistent message should be used.
+     * @param transacted      A flag to indicate that pings should be sent within transactions.
+     * @param selector        A message selector to filter received pings with.
+     * @param verbose         A flag to indicate that message timings should be sent to the console.
+     *
+     * @throws Exception All underlying exceptions allowed to fall through. This is only test code...
+     */
+    public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath,
+                           String destinationName, boolean persistent, boolean transacted, String selector,
+                           boolean verbose, boolean pubsub) throws Exception
+    {
+        // Create a client id to uniquely identify this client.
+        InetAddress address = InetAddress.getLocalHost();
+        String clientId = address.getHostName() + System.currentTimeMillis();
+        _verbose = verbose;
+        _persistent = persistent;
+        setPubSub(pubsub);
+        // Connect to the broker.
+        setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath));
+        _logger.info("Connected with URL:" + getConnection().toURL());
+
+        // Set up the failover notifier.
+        getConnection().setConnectionListener(new FailoverNotifier());
+
+        // Create a session to listen for messages on and one to send replies on, transactional depending on the
+        // command line option.
+        _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+        _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+        // Create the queue to listen for message on.
+        createConsumerDestination(destinationName);
+        MessageConsumer consumer = _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
+
+        // Create a producer for the replies, without a default destination.
+        _replyProducer = _producerSession.createProducer(null);
+        _replyProducer.setDisableMessageTimestamp(true);
+        _replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+        // Set this up to listen for messages on the queue.
+        consumer.setMessageListener(this);
+    }
+
+    private void createConsumerDestination(String name)
+    {
+        if (isPubSub())
+        {
+            _consumerDestination = new AMQTopic(name);
+        }
+        else
+        {
+            _consumerDestination = new AMQQueue(name);
+        }
+    }
+
+    /**
+     * Starts a stand alone ping-pong client running in verbose mode.
+     *
+     * @param args
+     */
+    public static void main(String[] args) throws Exception
+    {
+        System.out.println("Starting...");
+
+        // Display help on the command line.
+        if (args.length == 0)
+        {
+            _logger.info("Running test with default values...");
+            //usage();
+            //System.exit(0);
+        }
+
+        // Extract all command line parameters.
+        Config config = new Config();
+        config.setOptions(args);
+        String brokerDetails = config.getHost() + ":" + config.getPort();
+        String virtualpath = "/test";        
+        String destinationName = config.getDestination();
+        if (destinationName == null)
+        {
+            destinationName = DEFAULT_DESTINATION_NAME;
+        }
+        String selector = config.getSelector();
+        boolean transacted = config.isTransacted();
+        boolean persistent = config.usePersistentMessages();
+        boolean pubsub = config.isPubSub();
+        boolean verbose = true;
+
+        //String selector = null;
+
+        // Instantiate the ping pong client with the command line options and start it running.
+        PingPongBouncer pingBouncer = new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath,
+                                                destinationName, persistent, transacted, selector, verbose, pubsub);
+        pingBouncer.getConnection().start();
+
+        System.out.println("Waiting...");
+    }
+
+    /**
+     * This is a callback method that is notified of all messages for which this has been registered as a message
+     * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to
+     * destination of the message.
+     *
+     * @param message The message that triggered this callback.
+     */
+    public void onMessage(Message message)
+    {
+        try
+        {
+            String messageCorrelationId = message.getJMSCorrelationID();
+            if (_verbose)
+            {
+                _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, "
+                             + messageCorrelationId);
+            }
+
+            // Get the reply to destination from the message and check it is set.
+            Destination responseDest = message.getJMSReplyTo();
+
+            if (responseDest == null)
+            {
+                _logger.debug("Cannot send reply because reply-to destination is null.");
+
+                return;
+            }
+
+            // Spew out some timing information if verbose mode is on.
+            if (_verbose)
+            {
+                Long timestamp = message.getLongProperty("timestamp");
+
+                if (timestamp != null)
+                {
+                    long diff = System.currentTimeMillis() - timestamp;
+                    _logger.info("Time to bounce point: " + diff);
+                }
+            }
+
+            // Correlate the reply to the original.
+            message.setJMSCorrelationID(messageCorrelationId);
+
+            // Send the receieved message as the pong reply.
+            _replyProducer.send(responseDest, message);
+
+            if (_verbose)
+            {
+                _logger.info(timestampFormatter.format(new Date()) + ": Sent reply with correlation id, "
+                             + messageCorrelationId);
+            }
+
+            // Commit the transaction if running in transactional mode.
+            commitTx(_producerSession);
+        }
+        catch (JMSException e)
+        {
+            _logger.debug("There was a JMSException: " + e.getMessage(), e);
+        }
+    }
+
+    private static void usage()
+    {
+        System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n" +
+                           "-destinationname : queue/topic name\n" +
+                           "-transacted : (true/false). Default is false\n" +
+                           "-persistent : (true/false). Default is false\n" +
+                           "-pubsub     : (true/false). Default is false\n" +
+                           "-selector   : selector string\n");
+    }
+
+    /**
+     * A connection listener that logs out any failover complete events. Could do more interesting things with this
+     * at some point...
+     */
+    public static class FailoverNotifier implements ConnectionListener
+    {
+        public void bytesSent(long count)
+        { }
+
+        public void bytesReceived(long count)
+        { }
+
+        public boolean preFailover(boolean redirect)
+        {
+            return true;
+        }
+
+        public boolean preResubscribe()
+        {
+            return true;
+        }
+
+        public void failoverComplete()
+        {
+            _logger.info("App got failover complete callback.");
+        }
+    }
+}

Propchange: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=auto&rev=500188
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Fri Jan 26 00:52:20 2007
@@ -0,0 +1,806 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import java.net.InetAddress;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.MessageProducer;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.ping.AbstractPingProducer;
+import org.apache.qpid.ping.Throttle;
+import org.apache.qpid.topic.Config;
+
+/**
+ * PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back
+ * client (see {@link PingPongBouncer} for the bounce back client). It is designed to be run from the command line
+ * as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session,
+ * message producer and message consumer to run the ping-pong cycle on.
+ * <p/>
+ * <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings.
+ * This means that this class has to do some work to correlate pings with pongs; it expectes the original message
+ * id in the ping to be bounced back in the correlation id. If a new temporary queue per ping were used, then
+ * this correlation would not need to be done.
+ * <p/>
+ * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
+ * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so
+ * by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is
+ * also registered to terminate the ping-pong loop cleanly.
+ * <p/>
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide a ping and wait for response cycle.
+ * <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url.
+ * </table>
+ *
+ * @todo Make temp queue per ping a command line option.
+ * @todo Make the queue name a command line option.
+ */
+public class PingPongProducer extends AbstractPingProducer implements Runnable, MessageListener, ExceptionListener
+{
+    private static final Logger _logger = Logger.getLogger(PingPongProducer.class);
+
+    /**
+     * Used to set up a default message size.
+     */
+    protected static final int DEFAULT_MESSAGE_SIZE = 0;
+
+    /**
+     * This is set and used when the test is for multiple-destinations
+     */
+    protected static final int DEFAULT_DESTINATION_COUNT = 0;
+
+    protected static final int DEFAULT_RATE = 0;
+
+    /**
+     * Used to define how long to wait between pings.
+     */
+    protected static final long SLEEP_TIME = 250;
+
+    /**
+     * Used to define how long to wait before assuming that a ping has timed out.
+     */
+    protected static final long TIMEOUT = 9000;
+
+    /**
+     * Holds the name of the destination to send pings on.
+     */
+    protected static final String PING_DESTINATION_NAME = "ping";
+
+    /**
+     * The batch size.
+     */
+    protected static final int DEFAULT_BATCH_SIZE = 100;
+
+    protected static final int PREFETCH = 100;
+    protected static final boolean NO_LOCAL = true;
+    protected static final boolean EXCLUSIVE = false;
+
+    /**
+     * The number of priming loops to run.
+     */
+    protected static final int PRIMING_LOOPS = 3;
+
+    /**
+     * A source for providing sequential unique correlation ids.
+     */
+    private static AtomicLong idGenerator = new AtomicLong(0L);
+
+    /**
+     * Holds a map from message ids to latches on which threads wait for replies.
+     */
+    private static Map<String, CountDownLatch> trafficLights = new HashMap<String, CountDownLatch>();
+
+    /**
+     * Destination where the responses messages will arrive
+     */
+    private Destination _replyDestination;
+
+    /**
+     * Destination where the producer will be sending message to
+     */
+    private Destination _pingDestination;
+
+    /**
+     * Determines whether this producer sends persistent messages from the run method.
+     */
+    protected boolean _persistent;
+
+    /**
+     * Holds the message size to send, from the run method.
+     */
+    protected int _messageSize;
+
+    /**
+     * Used to indicate that the ping loop should print out whenever it pings.
+     */
+    protected boolean _verbose = false;
+
+    protected Session _consumerSession;
+
+    /**
+     * Used to restrict the sending rate to a specified limit.
+     */
+    private Throttle rateLimiter = null;
+
+    /**
+     * The throttler can only reliably restrict to a few hundred cycles per second, so a throttling batch size is used
+     * to group sends together into batches large enough that the throttler runs slower than that.
+     */
+    int _throttleBatchSize;
+
+    private MessageListener _messageListener = null;
+
+    private PingPongProducer(String brokerDetails, String username, String password, String virtualpath, boolean transacted,
+                             boolean persistent, int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit,
+                             boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize, int rate)
+            throws Exception
+    {
+        // Create a connection to the broker.
+        InetAddress address = InetAddress.getLocalHost();
+        String clientID = address.getHostName() + System.currentTimeMillis();
+
+        setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath));
+
+        // Create transactional or non-transactional sessions, based on the command line arguments.
+        setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE));
+        _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+        _persistent = persistent;
+        _messageSize = messageSize;
+        _verbose = verbose;
+
+        // Set failover interrupts
+        _failAfterCommit = afterCommit;
+        _failBeforeCommit = beforeCommit;
+        _failAfterSend = afterSend;
+        _failBeforeSend = beforeSend;
+        _failOnce = failOnce;
+        _txBatchSize = batchSize;
+
+        // Calculate a throttling batch size and rate such that the throttle runs slower than 100 cycles per second
+        // and batched sends within each cycle multiply up to give the desired rate.
+        //
+        // total rate = throttle rate * batch size.
+        // 1 < throttle rate < 100
+        // 1 < total rate < 20000
+        if (rate > DEFAULT_RATE)
+        {
+            // Log base 10 over 2 is used here to get a feel for what power of 100 the total rate is.
+            // As the total rate goes up the powers of 100 the batch size goes up by powers of 100 to keep the
+            // throttle rate back into the range 1 to 100.
+            int x = (int) (Math.log10(rate) / 2);
+            _throttleBatchSize = (int) Math.pow(100, x);
+            int throttleRate = rate / _throttleBatchSize;
+
+            _logger.debug("rate = " + rate);
+            _logger.debug("x = " + x);
+            _logger.debug("_throttleBatchSize = " + _throttleBatchSize);
+            _logger.debug("throttleRate = " + throttleRate);
+
+            rateLimiter = new Throttle();
+            rateLimiter.setRate(throttleRate);
+        }
+    }
+
+    /**
+     * Creates a ping pong producer with the specified connection details and type.
+     *
+     * @param brokerDetails
+     * @param username
+     * @param password
+     * @param virtualpath
+     * @param transacted
+     * @throws Exception All allowed to fall through. This is only test code...
+     */
+    public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
+                            String destinationName, String selector, boolean transacted, boolean persistent,
+                            int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit,
+                            boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize,
+                            int noOfDestinations, int rate, boolean pubsub) throws Exception
+    {
+        this(brokerDetails, username, password, virtualpath, transacted, persistent, messageSize, verbose, afterCommit,
+             beforeCommit, afterSend, beforeSend, failOnce, batchSize, rate);
+
+        _destinationCount = noOfDestinations;
+        setPubSub(pubsub);
+
+        if (noOfDestinations == DEFAULT_DESTINATION_COUNT)
+        {
+            if (destinationName != null)
+            {
+                createPingDestination(destinationName);
+                // Create producer and the consumer
+                createProducer();
+                createConsumer(selector);
+            }
+            else
+            {
+                _logger.error("Destination is not specified");
+                throw new IllegalArgumentException("Destination is not specified");
+            }
+        }
+    }
+
+    private void createPingDestination(String name)
+    {
+        if (isPubSub())
+        {
+            _pingDestination = new AMQTopic(name);
+        }
+        else
+        {
+            _pingDestination = new AMQQueue(name);
+        }
+    }
+
+    /**
+     * Creates the producer to send the pings on.  If the tests are with nultiple-destinations, then producer
+     * is created with null destination, so that any destination can be specified while sending
+     *
+     * @throws JMSException
+     */
+    public void createProducer() throws JMSException
+    {
+        if (getDestinationsCount() > DEFAULT_DESTINATION_COUNT)
+        {
+            // create producer with initial destination as null for test with multiple-destinations
+            // In this case, a different destination will be used while sending the message
+            _producer = (MessageProducer) getProducerSession().createProducer(null);
+        }
+        else
+        {
+            // Create a producer with known destination to send the pings on.
+            _producer = (MessageProducer) getProducerSession().createProducer(_pingDestination);
+
+        }
+
+        _producer.setDisableMessageTimestamp(true);
+        _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+    }
+
+    /**
+     * Creates the temporary destination to listen to the responses
+     *
+     * @param selector
+     * @throws JMSException
+     */
+    public void createConsumer(String selector) throws JMSException
+    {
+        // Create a temporary destination to get the pongs on.
+        if (isPubSub())
+        {
+            _replyDestination = _consumerSession.createTemporaryTopic();
+        }
+        else
+        {
+            _replyDestination = _consumerSession.createTemporaryQueue();
+        }
+
+        // Create a message consumer to get the replies with and register this to be called back by it.
+        MessageConsumer consumer = _consumerSession.createConsumer(_replyDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
+        consumer.setMessageListener(this);
+    }
+
+    /**
+     * Creates consumer instances for each destination. This is used when test is being done with multiple destinations.
+     *
+     * @param selector
+     * @throws JMSException
+     */
+    public void createConsumers(String selector) throws JMSException
+    {
+        for (int i = 0; i < getDestinationsCount(); i++)
+        {
+            MessageConsumer consumer =
+                    getConsumerSession().createConsumer(getDestination(i), PREFETCH, false, EXCLUSIVE, selector);
+            consumer.setMessageListener(this);
+        }
+    }
+
+
+    public Session getConsumerSession()
+    {
+        return _consumerSession;
+    }
+
+    public Destination getPingDestination()
+    {
+        return _pingDestination;
+    }
+
+    protected void setPingDestination(Destination destination)
+    {
+        _pingDestination = destination;
+    }
+
+    /**
+     * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongBouncer} also needs
+     * to be started to bounce the pings back again.
+     * <p/>
+     * <p/>The command line takes from 2 to 4 arguments:
+     * <p/><table>
+     * <tr><td>brokerDetails <td> The broker connection string.
+     * <tr><td>virtualPath   <td> The virtual path.
+     * <tr><td>transacted    <td> A boolean flag, telling this client whether or not to use transactions.
+     * <tr><td>size          <td> The size of ping messages to use, in bytes.
+     * </table>
+     *
+     * @param args The command line arguments as defined above.
+     */
+    public static void main(String[] args) throws Exception
+    {
+        // Extract the command line.
+        Config config = new Config();
+        config.setOptions(args);
+        if (args.length == 0)
+        {
+            _logger.info("Running test with default values...");
+            //usage();
+            //System.exit(0);
+        }
+
+        String brokerDetails = config.getHost() + ":" + config.getPort();
+        String virtualpath = "/test";
+        String selector = config.getSelector();
+        boolean verbose = true;
+        boolean transacted = config.isTransacted();
+        boolean persistent = config.usePersistentMessages();
+        int messageSize = (config.getPayload() != 0) ? config.getPayload() : DEFAULT_MESSAGE_SIZE;
+        //int messageCount = config.getMessages();
+        int destCount = (config.getDestinationsCount() != 0) ? config.getDestinationsCount() : DEFAULT_DESTINATION_COUNT;
+        int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : DEFAULT_BATCH_SIZE;
+        int rate = (config.getRate() != 0) ? config.getRate() : DEFAULT_RATE;
+        boolean pubsub = config.isPubSub();
+
+        String destName = config.getDestination();
+        if (destName == null)
+        {
+            destName = PING_DESTINATION_NAME;
+        }
+
+        boolean afterCommit = false;
+        boolean beforeCommit = false;
+        boolean afterSend = false;
+        boolean beforeSend = false;
+        boolean failOnce = false;
+
+        for (String arg : args)
+        {
+            if (arg.startsWith("failover:"))
+            {
+                //failover:<before|after>:<send:commit> | failover:once
+                String[] parts = arg.split(":");
+                if (parts.length == 3)
+                {
+                    if (parts[2].equals("commit"))
+                    {
+                        afterCommit = parts[1].equals("after");
+                        beforeCommit = parts[1].equals("before");
+                    }
+
+                    if (parts[2].equals("send"))
+                    {
+                        afterSend = parts[1].equals("after");
+                        beforeSend = parts[1].equals("before");
+                    }
+
+                    if (parts[1].equals("once"))
+                    {
+                        failOnce = true;
+                    }
+                }
+                else
+                {
+                    System.out.println("Unrecognized failover request:" + arg);
+                }
+            }
+        }
+
+        // Create a ping producer to handle the request/wait/reply cycle.
+        PingPongProducer pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath,
+                                                             destName, selector, transacted, persistent, messageSize, verbose,
+                                                             afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize,
+                                                             destCount, rate, pubsub);
+
+        pingProducer.getConnection().start();
+
+        // Run a few priming pings to remove warm up time from test results.
+        //pingProducer.prime(PRIMING_LOOPS);
+        // Create a shutdown hook to terminate the ping-pong producer.
+        Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
+
+        // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
+        pingProducer.getConnection().setExceptionListener(pingProducer);
+
+        // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
+        Thread pingThread = new Thread(pingProducer);
+        pingThread.run();
+        pingThread.join();
+    }
+
+    private static void usage()
+    {
+        System.err.println("Usage: TestPingPublisher \n" + "-host : broker host" + "-port : broker port" +
+                           "-destinationname : queue/topic name\n" +
+                           "-transacted : (true/false). Default is false\n" +
+                           "-persistent : (true/false). Default is false\n" +
+                           "-pubsub     : (true/false). Default is false\n" +
+                           "-selector   : selector string\n" +
+                           "-payload    : paylaod size. Default is 0\n" +
+                           //"-messages   : no of messages to be sent (if 0, the ping loop will run indefinitely)\n" +
+                           "-destinationscount : no of destinations for multi-destinations test\n" +
+                           "-batchsize  : batch size\n" +
+                           "-rate : thruput rate\n");
+    }
+
+    /**
+     * Primes the test loop by sending a few messages, then introduces a short wait. This allows the bounce back client
+     * on the other end a chance to configure its reply producer on the reply to destination. It is also worth calling
+     * this a few times, in order to prime the JVMs JIT compilation.
+     *
+     * @param x The number of priming loops to run.
+     * @throws JMSException All underlying exceptions are allowed to fall through.
+     */
+    public void prime(int x) throws JMSException
+    {
+        for (int i = 0; i < x; i++)
+        {
+            // Create and send a small message.
+            Message first = getTestMessage(_replyDestination, 0, false);
+            sendMessage(first);
+
+            commitTx();
+
+            try
+            {
+                Thread.sleep(100);
+            }
+            catch (InterruptedException ignore)
+            {
+
+            }
+        }
+
+
+    }
+
+    /**
+     * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a
+     * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected
+     * in the replies map.
+     *
+     * @param message The received message.
+     */
+    public void onMessage(Message message)
+    {
+
+        try
+        {
+
+            // Store the reply, if it has a correlation id that is expected.
+            String correlationID = message.getJMSCorrelationID();
+
+            if (_verbose)
+            {
+                _logger.info(timestampFormatter.format(new Date()) + ": Got reply with correlation id, " + correlationID);
+                //_logger.debug("Received from : " + message.getJMSDestination());
+            }
+
+            // Turn the traffic light to green.
+            CountDownLatch trafficLight = trafficLights.get(correlationID);
+
+            if (trafficLight != null)
+            {
+                if (_messageListener != null)
+                {
+                    synchronized (trafficLight)
+                    {
+                        _messageListener.onMessage(message);
+                        trafficLight.countDown();
+                    }
+                }
+                else
+                {
+                    trafficLight.countDown();
+                }
+
+                _logger.trace("Reply was expected, decrementing the latch for the id.");
+
+                long remainingCount = trafficLight.getCount();
+
+                if ((remainingCount % _txBatchSize) == 0)
+                {
+                    commitTx(getConsumerSession());
+                }
+
+            }
+            else
+            {
+                _logger.trace("There was no thread waiting for reply: " + correlationID);
+            }
+
+            if (_verbose)
+            {
+                Long timestamp = message.getLongProperty("timestamp");
+
+                if (timestamp != null)
+                {
+                    long diff = System.currentTimeMillis() - timestamp;
+                    _logger.trace("Time for round trip: " + diff);
+                }
+            }
+        }
+        catch (JMSException e)
+        {
+            _logger.warn("There was a JMSException: " + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
+     * before a reply arrives, then a null reply is returned from this method.
+     *
+     * @param message  The message to send.
+     * @param numPings The number of ping messages to send.
+     * @param timeout  The timeout in milliseconds.
+     * @return The number of replies received. This may be less than the number sent if the timeout terminated the
+     *         wait for all prematurely.
+     * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+     */
+    public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
+    {
+        String messageCorrelationId = null;
+
+        try
+        {
+            // Put a unique correlation id on the message before sending it.
+            messageCorrelationId = Long.toString(getNewID());
+
+            pingNoWaitForReply(message, numPings, messageCorrelationId);
+
+            CountDownLatch trafficLight = trafficLights.get(messageCorrelationId);
+            // Block the current thread until a reply to the message is received, or it times out.
+            trafficLight.await(timeout, TimeUnit.MILLISECONDS);
+
+            // Work out how many replies were receieved.
+            int numReplies = numPings - (int) trafficLight.getCount();
+
+            if ((numReplies < numPings) && _verbose)
+            {
+                _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
+            }
+            else if (_verbose)
+            {
+                _logger.info("Got all replies on id, " + messageCorrelationId);
+            }
+
+            commitTx(getConsumerSession());
+
+            return numReplies;
+        }
+        finally
+        {
+            removeLock(messageCorrelationId);
+        }
+    }
+
+    public long getNewID()
+    {
+        return idGenerator.incrementAndGet();
+    }
+
+    public CountDownLatch removeLock(String correlationID)
+    {
+        return trafficLights.remove(correlationID);
+    }
+
+
+    /*
+    * Sends the specified ping message but does not wait for a correlating reply.
+    *
+    * @param message  The message to send.
+    * @param numPings The number of pings to send.
+    * @return The reply, or null if no reply arrives before the timeout.
+    * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+    */
+    public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException, InterruptedException
+    {
+        // Create a count down latch to count the number of replies with. This is created before the message is sent
+        // so that the message is not received before the count down is created.
+        CountDownLatch trafficLight = new CountDownLatch(numPings);
+        trafficLights.put(messageCorrelationId, trafficLight);
+
+        message.setJMSCorrelationID(messageCorrelationId);
+
+        // Set up a committed flag to detect uncommitted message at the end of the send loop. This may occurr if the
+        // transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is
+        // needed.
+        boolean committed = false;
+
+        // Send all of the ping messages.
+        for (int i = 0; i < numPings; i++)
+        {
+            // Reset the committed flag to indicate that there are uncommitted message.
+            committed = false;
+
+            // Re-timestamp the message.
+            message.setLongProperty("timestamp", System.currentTimeMillis());
+
+            // Check if the test is with multiple-destinations, in which case round robin the destinations
+            // as the messages are sent.
+            if (getDestinationsCount() > DEFAULT_DESTINATION_COUNT)
+            {
+                sendMessage(getDestination(i % getDestinationsCount()), message);
+            }
+            else
+            {
+                sendMessage(message);
+            }
+
+            // Apply message rate throttling if a rate limit has been set up and the throttling batch limit has been
+            // reached. See the comment on the throttle batch size for information about the use of batches here.
+            if ((rateLimiter != null) && ((i % _throttleBatchSize) == 0))
+            {
+                rateLimiter.throttle();
+            }
+
+            // Call commit every time the commit batch size is reached.
+            if ((i % _txBatchSize) == 0)
+            {
+                commitTx();
+                committed = true;
+            }
+        }
+
+        // Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages.
+        if (!committed)
+        {
+            commitTx();
+        }
+
+        // Spew out per message timings only in verbose mode.
+        if (_verbose)
+        {
+            _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
+        }
+
+    }
+
+    /**
+     * The ping loop implementation. This send out pings of the configured size, persistence and transactionality, and
+     * waits for replies and inserts short pauses in between each.
+     */
+    public void pingLoop()
+    {
+        try
+        {
+            // Generate a sample message and time stamp it.
+            ObjectMessage msg = getTestMessage(_replyDestination, _messageSize, _persistent);
+            msg.setLongProperty("timestamp", System.currentTimeMillis());
+
+            // Send the message and wait for a reply.
+            pingAndWaitForReply(msg, DEFAULT_BATCH_SIZE, TIMEOUT);
+
+            // Introduce a short pause if desired.
+            pause(SLEEP_TIME);
+        }
+        catch (JMSException e)
+        {
+            _publish = false;
+            _logger.debug("There was a JMSException: " + e.getMessage(), e);
+        }
+        catch (InterruptedException e)
+        {
+            _publish = false;
+            _logger.debug("There was an interruption: " + e.getMessage(), e);
+        }
+    }
+
+    public Destination getReplyDestination()
+    {
+        return _replyDestination;
+    }
+
+    protected void setReplyDestination(Destination destination)
+    {
+        _replyDestination = destination;
+    }
+
+    public void setMessageListener(MessageListener messageListener)
+    {
+        _messageListener = messageListener;
+    }
+
+    public CountDownLatch getEndLock(String correlationID)
+    {
+        return trafficLights.get(correlationID);
+    }
+
+    /*
+    * When the test is being performed with multiple queues, then this method will be used, which has a loop to
+    * pick up the next queue from the queues list and sends message to it.
+    *
+    * @param message
+    * @param numPings
+    * @throws JMSException
+    */
+    /*private void pingMultipleQueues(Message message, int numPings) throws JMSException
+    {
+        int queueIndex = 0;
+        for (int i = 0; i < numPings; i++)
+        {
+            // Re-timestamp the message.
+            message.setLongProperty("timestamp", System.currentTimeMillis());
+
+            sendMessage(getDestination(queueIndex++), message);
+
+            // reset the counter to get the first queue
+            if (queueIndex == (getDestinationsCount() - 1))
+            {
+                queueIndex = 0;
+            }
+        }
+    }*/
+
+    /**
+     * A connection listener that logs out any failover complete events. Could do more interesting things with this
+     * at some point...
+     */
+    public static class FailoverNotifier implements ConnectionListener
+    {
+        public void bytesSent(long count)
+        {
+        }
+
+        public void bytesReceived(long count)
+        {
+        }
+
+        public boolean preFailover(boolean redirect)
+        {
+            return true; //Allow failover
+        }
+
+        public boolean preResubscribe()
+        {
+            return true; // Allow resubscription
+        }
+
+        public void failoverComplete()
+        {
+            _logger.info("App got failover complete callback.");
+        }
+    }
+}

Propchange: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java?view=diff&rev=500188&r1=500187&r2=500188
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java Fri Jan 26 00:52:20 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,11 +21,11 @@
 package org.apache.qpid.requestreply;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.jms.Session;
 import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.jms.Session;
 import org.apache.qpid.url.URLSyntaxException;
 
 import javax.jms.*;
@@ -42,12 +42,22 @@
 
     private AMQConnection _connection;
 
+    private Session _session;
+    private Session _producerSession;
+
+    private boolean _isTransactional;
+
     public ServiceProvidingClient(String brokerDetails, String username, String password,
-                                  String clientName, String virtualPath, String serviceName)
+                                  String clientName, String virtualPath, String serviceName,
+                                  final int deliveryMode, boolean transactedMode, String selector)
             throws AMQException, JMSException, URLSyntaxException
     {
-        _connection = new AMQConnection(brokerDetails, username, password,
-                                        clientName, virtualPath);
+        _isTransactional = transactedMode;
+
+        _logger.info("Delivery Mode: " + (deliveryMode == DeliveryMode.NON_PERSISTENT ? "Non Persistent" : "Persistent")
+                     + "\t isTransactional: " + _isTransactional);
+
+        _connection = new AMQConnection(brokerDetails, username, password, clientName, virtualPath);
         _connection.setConnectionListener(new ConnectionListener()
         {
 
@@ -74,14 +84,15 @@
                 _logger.info("App got failover complete callback");
             }
         });
-        final Session session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        _session = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
+        _producerSession = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
 
         _logger.info("Service (queue) name is '" + serviceName + "'...");
 
         AMQQueue destination = new AMQQueue(serviceName);
 
-        MessageConsumer consumer = session.createConsumer(destination,
-                                                          100, true, false, null);
+        MessageConsumer consumer = _session.createConsumer(destination,
+                                                           100, true, false, selector);
 
         consumer.setMessageListener(new MessageListener()
         {
@@ -90,9 +101,7 @@
             public void onMessage(Message message)
             {
                 //_logger.info("Got message '" + message + "'");
-
                 TextMessage tm = (TextMessage) message;
-
                 try
                 {
                     Destination responseDest = tm.getJMSReplyTo();
@@ -107,9 +116,9 @@
                         _responseDest = responseDest;
 
                         _logger.info("About to create a producer");
-                        _destinationProducer = session.createProducer(responseDest);
+                        _destinationProducer = _producerSession.createProducer(responseDest);
                         _destinationProducer.setDisableMessageTimestamp(true);
-                        _destinationProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                        _destinationProducer.setDeliveryMode(deliveryMode);
                         _logger.info("After create a producer");
                     }
                 }
@@ -127,14 +136,25 @@
                 try
                 {
                     String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.getText();
-                    TextMessage msg = session.createTextMessage(payload);
+                    TextMessage msg = _producerSession.createTextMessage(payload);
                     if (tm.propertyExists("timeSent"))
                     {
                         _logger.info("timeSent property set on message");
-                        _logger.info("timeSent value is: " + tm.getLongProperty("timeSent"));
-                        msg.setStringProperty("timeSent", tm.getStringProperty("timeSent"));
+                        long timesent = tm.getLongProperty("timeSent");
+                        _logger.info("timeSent value is: " + timesent);
+                        msg.setLongProperty("timeSent", timesent);
                     }
+                    
                     _destinationProducer.send(msg);
+
+                    if (_isTransactional)
+                    {
+                        _producerSession.commit();
+                    }
+                    if (_isTransactional)
+                    {
+                        _session.commit();
+                    }
                     if (_messageCount % 1000 == 0)
                     {
                         _logger.info("Sent response to '" + _responseDest + "'");
@@ -160,7 +180,7 @@
 
         if (args.length < 5)
         {
-            System.out.println("Usage: brokerDetails username password virtual-path serviceQueue [selector]");
+            System.out.println("Usage: serviceProvidingClient <brokerDetails> <username> <password> <virtual-path> <serviceQueue> [<P[ersistent]|N[onPersistent]> <T[ransacted]|N[onTransacted]>] [selector]");
             System.exit(1);
         }
         String clientId = null;
@@ -174,10 +194,28 @@
             _logger.error("Error: " + e, e);
         }
 
+        int deliveryMode = DeliveryMode.NON_PERSISTENT;
+        boolean transactedMode = false;
+
+        if (args.length > 7)
+        {
+            deliveryMode = args[args.length - 2].toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT
+                           : DeliveryMode.NON_PERSISTENT;
+
+            transactedMode = args[args.length - 1].toUpperCase().charAt(0) == 'T' ? true : false;
+        }
+
+        String selector = null;
+        if ((args.length == 8) || (args.length == 7))
+        {
+            selector = args[args.length - 1];
+        }
+
         try
         {
             ServiceProvidingClient client = new ServiceProvidingClient(args[0], args[1], args[2],
-                                                                       clientId, args[3], args[4]);
+                                                                       clientId, args[3], args[4],
+                                                                       deliveryMode, transactedMode, selector);
             client.run();
         }
         catch (JMSException e)
@@ -192,10 +230,6 @@
         {
             _logger.error("Error: " + e, e);
         }
-
-
-
     }
-
 }
 

Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java?view=diff&rev=500188&r1=500187&r2=500188
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java Fri Jan 26 00:52:20 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,13 +22,15 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.url.URLSyntaxException;
 import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.message.TestMessageFactory;
+import org.apache.qpid.client.message.JMSTextMessage;
 import org.apache.qpid.jms.MessageConsumer;
 import org.apache.qpid.jms.MessageProducer;
 import org.apache.qpid.jms.Session;
+import org.apache.qpid.url.URLSyntaxException;
 
 import javax.jms.*;
 import java.net.InetAddress;
@@ -38,26 +40,33 @@
  * A client that behaves as follows:
  * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li>
  * <li>Creates a temporary queue</li>
- * <li>Creates messages containing a property that is the name of the temporary queue</li>
- * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li>
+ * <li>Creates messages containing a property(reply-to) that is the name of the temporary queue</li>
+ * <li>Fires off a message on the original queue and registers the callbackHandler to listen to the response on the temporary queue</li>
+ * <li>Start the loop to send all messages</li>
+ * <li>CallbackHandler keeps listening to the responses and exits if all the messages have been received back or
+ *  if the waiting time for next message is elapsed</li>
  * </ul>
- *
  */
 public class ServiceRequestingClient implements ExceptionListener
 {
     private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class);
 
-    private static final String MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk  ";
+    private long _messageIdentifier = 0;
+
+    // time for which callbackHandler should wait for a message before exiting. Default time= 60 secs
+    private static long _callbackHandlerWaitingTime = 60000;
 
     private String MESSAGE_DATA;
 
     private AMQConnection _connection;
 
     private Session _session;
+    private Session _producerSession;
 
     private long _averageLatency;
 
     private int _messageCount;
+    private boolean _isTransactional;
 
     private volatile boolean _completed;
 
@@ -67,40 +76,25 @@
 
     private Object _waiter;
 
-    private static String createMessagePayload(int size)
-    {
-        _log.info("Message size set to " + size + " bytes");
-        StringBuffer buf = new StringBuffer(size);
-        int count = 0;
-        while (count < size + MESSAGE_DATA_BYTES.length())
-        {
-            buf.append(MESSAGE_DATA_BYTES);
-            count += MESSAGE_DATA_BYTES.length();
-        }
-        if (count < size)
-        {
-            buf.append(MESSAGE_DATA_BYTES, 0, size - count);
-        }
-
-        return buf.toString();
-    }
-
     private class CallbackHandler implements MessageListener
     {
-        private int _expectedMessageCount;
-
         private int _actualMessageCount;
 
         private long _startTime;
+        // The time when the last message was received by the callbackHandler
+        private long _messageReceivedTime = 0;
+        private Object _timerCallbackHandler = new Object();
 
-        public CallbackHandler(int expectedMessageCount, long startTime)
+        public CallbackHandler(long startTime)
         {
-            _expectedMessageCount = expectedMessageCount;
             _startTime = startTime;
+            // Start the timer thread, which will keep checking if test should exit because the waiting time has elapsed
+            (new Thread(new TimerThread())).start();
         }
 
         public void onMessage(Message m)
         {
+            _messageReceivedTime = System.currentTimeMillis();
             if (_log.isDebugEnabled())
             {
                 _log.debug("Message received: " + m);
@@ -110,20 +104,23 @@
                 m.getPropertyNames();
                 if (m.propertyExists("timeSent"))
                 {
-                    long timeSent = Long.parseLong(m.getStringProperty("timeSent"));
-                    long now = System.currentTimeMillis();
+                    long timeSent = m.getLongProperty("timeSent");
                     if (_averageLatency == 0)
                     {
-                        _averageLatency = now - timeSent;
+                        _averageLatency = _messageReceivedTime - timeSent;
                         _log.info("Latency " + _averageLatency);
                     }
                     else
                     {
-                        _log.info("Individual latency: " + (now - timeSent));
-                        _averageLatency = (_averageLatency + (now - timeSent)) / 2;
+                        _log.info("Individual latency: " + (_messageReceivedTime - timeSent));
+                        _averageLatency = (_averageLatency + (_messageReceivedTime - timeSent)) / 2;
                         _log.info("Average latency now: " + _averageLatency);
                     }
                 }
+                if(_isTransactional)
+                {
+                    _session.commit();
+                }
             }
             catch (JMSException e)
             {
@@ -135,26 +132,114 @@
                 _log.info("Received message count: " + _actualMessageCount);
             }
 
-            if (_actualMessageCount == _expectedMessageCount)
+            checkForMessageID(m);
+
+            if (_actualMessageCount == _messageCount)
             {
-                _completed = true;
-                notifyWaiter();
-                long timeTaken = System.currentTimeMillis() - _startTime;
-                _log.info("Total time taken to receive " + _expectedMessageCount + " messages was " +
-                          timeTaken + "ms, equivalent to " +
-                          (_expectedMessageCount / (timeTaken / 1000.0)) + " messages per second");
+                finishTesting(_actualMessageCount);
+            }
+        }
+
+        /**
+         * sets completed flag to true, closes the callbackHandler connection and notifies the waiter thread,
+         * so that the callbackHandler can finish listening for messages. This causes the test to finish.
+         * @param receivedMessageCount
+         */
+        private void finishTesting(int receivedMessageCount)
+        {
+            _completed = true;
+            notifyWaiter();
+            notifyTimerThread();
+
+            long timeTaken = System.currentTimeMillis() - _startTime;
+            _log.info("***** Result *****");
+            _log.info("Total messages received = " + receivedMessageCount);
+            _log.info("Total time taken to receive " + receivedMessageCount + " messages was " +
+                      timeTaken + "ms, equivalent to " +
+                      (receivedMessageCount / (timeTaken / 1000.0)) + " messages per second");
+
+            try
+            {
+                _connection.close();
+                _log.info("Connection closed");
+            }
+            catch (JMSException e)
+            {
+                _log.error("Error closing connection");
+            }
+        }
 
-                try
+        private void notifyTimerThread()
+        {
+            if (_timerCallbackHandler != null)
+            {
+                synchronized (_timerCallbackHandler)
                 {
-                    _connection.close();
-                    _log.info("Connection closed");
+                    _timerCallbackHandler.notify();
                 }
-                catch (JMSException e)
+            }
+        }
+
+        /**
+         * Thread class implementing the timer for callbackHandler. The thread will exit the test if the waiting time
+         * has elapsed before next message is received.
+         */
+        private class TimerThread implements Runnable
+        {
+            public void run()
+            {
+                do
                 {
-                    _log.error("Error closing connection");
+                    try
+                    {
+                        synchronized(_timerCallbackHandler)
+                        {
+                            _timerCallbackHandler.wait(_callbackHandlerWaitingTime);
+                        }
+                    }
+                    catch (InterruptedException ignore)
+                    {
+
+                    }
+
+                    // exit if callbackHandler has received all messages
+                    if (_completed)
+                    {
+                        return;
+                    }
                 }
+                while ((System.currentTimeMillis() - _messageReceivedTime) < _callbackHandlerWaitingTime);
+
+                // waiting time has elapsed, so exit the test
+                _log.info("");
+                _log.info("Exited after waiting for " + _callbackHandlerWaitingTime/1000 + " secs");
+                finishTesting(_actualMessageCount);
             }
         }
+    } // end of CallbackHandler class
+
+    /**
+     * Checks if the received AMQ Message ID(delivery tag) is in sequence, by comparing it with the AMQ MessageID
+     * of previous message.
+     * @param receivedMsg
+     */
+    private void checkForMessageID(Message receivedMsg)
+    {
+        try
+        {
+            JMSTextMessage msg = (JMSTextMessage)receivedMsg;
+            if (! (msg.getDeliveryTag() == _messageIdentifier + 1))
+            {
+                _log.info("Out of sequence message received. Previous AMQ MessageID= " + _messageIdentifier +
+                          ", Received AMQ messageID= " + receivedMsg.getJMSMessageID());
+            }
+            _messageIdentifier = msg.getDeliveryTag();
+        }
+        catch (Exception ex)
+        {
+            _log.error("Error in checking messageID ", ex);
+        }
+
     }
 
     private void notifyWaiter()
@@ -167,25 +252,31 @@
             }
         }
     }
+
     public ServiceRequestingClient(String brokerHosts, String clientID, String username, String password,
                                    String vpath, String commandQueueName,
+                                   int deliveryMode, boolean transactedMode,
                                    final int messageCount, final int messageDataLength) throws AMQException, URLSyntaxException
     {
+        _isTransactional = transactedMode;
+
+        _log.info("Delivery Mode: " + (deliveryMode == DeliveryMode.NON_PERSISTENT ? "Non Persistent" : "Persistent"));
+        _log.info("isTransactional: " + _isTransactional);
+
         _messageCount = messageCount;
-        MESSAGE_DATA = createMessagePayload(messageDataLength);
+        MESSAGE_DATA = TestMessageFactory.createMessagePayload(messageDataLength);
         try
         {
             createConnection(brokerHosts, clientID, username, password, vpath);
-            _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+            _session = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
+            _producerSession = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
 
             _connection.setExceptionListener(this);
 
-
             AMQQueue destination = new AMQQueue(commandQueueName);
-            _producer = (MessageProducer) _session.createProducer(destination);
+            _producer = (MessageProducer) _producerSession.createProducer(destination);
             _producer.setDisableMessageTimestamp(true);
-            _producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+            _producer.setDeliveryMode(deliveryMode);
 
             _tempDestination = new AMQQueue("TempResponse" +
                                             Long.toString(System.currentTimeMillis()), true);
@@ -195,7 +286,11 @@
             //Send first message, then wait a bit to allow the provider to get initialised
             TextMessage first = _session.createTextMessage(MESSAGE_DATA);
             first.setJMSReplyTo(_tempDestination);
-            _producer.send(first);
+             _producer.send(first);
+            if (_isTransactional)
+            {
+                _producerSession.commit();
+            }
             try
             {
                 Thread.sleep(1000);
@@ -207,7 +302,7 @@
             //now start the clock and the test...
             final long startTime = System.currentTimeMillis();
 
-            messageConsumer.setMessageListener(new CallbackHandler(messageCount, startTime));
+            messageConsumer.setMessageListener(new CallbackHandler(startTime));
         }
         catch (JMSException e)
         {
@@ -217,6 +312,7 @@
 
     /**
      * Run the test and notify an object upon receipt of all responses.
+     *
      * @param waiter the object that will be notified
      * @throws JMSException
      */
@@ -226,14 +322,19 @@
         _connection.start();
         for (int i = 1; i < _messageCount; i++)
         {
-            TextMessage msg = _session.createTextMessage(MESSAGE_DATA + i);
+            TextMessage msg = _producerSession.createTextMessage(MESSAGE_DATA + i);
             msg.setJMSReplyTo(_tempDestination);
             if (i % 1000 == 0)
             {
                 long timeNow = System.currentTimeMillis();
-                msg.setStringProperty("timeSent", String.valueOf(timeNow));
+                msg.setLongProperty("timeSent", timeNow);
+            }
+             _producer.send(msg);
+            if (_isTransactional)
+            {
+                _producerSession.commit();
             }
-            _producer.send(msg);
+
         }
         _log.info("Finished sending " + _messageCount + " messages");
     }
@@ -246,8 +347,7 @@
     private void createConnection(String brokerHosts, String clientID, String username, String password,
                                   String vpath) throws AMQException, URLSyntaxException
     {
-        _connection = new AMQConnection(brokerHosts, username, password,
-                                        clientID, vpath);
+        _connection = new AMQConnection(brokerHosts, username, password, clientID, vpath);
     }
 
     /**
@@ -256,22 +356,48 @@
      */
     public static void main(String[] args)
     {
-        if (args.length < 6)
+        if ((args.length < 6) || (args.length == 8))
         {
-            System.err.println(
-                    "Usage: ServiceRequestingClient <brokerDetails - semicolon separated host:port list> <username> <password> <vpath> <command queue name> <number of messages> <message size>");
+            System.err.println("Usage: ServiceRequestingClient <brokerDetails> <username> <password> <vpath> " +
+                    "<command queue name> <number of messages> [<message size>] " +
+                    "[<P[ersistent]|N[onPersistent] (Default N)>  <T[ransacted]|N[onTransacted] (Default N)>] " +
+                    "[<waiting time for response in sec (default 60 sec)>]");
+            System.exit(1);
         }
         try
         {
-            int messageDataLength = args.length > 6 ? Integer.parseInt(args[6]) : 4096;
+            int messageSize = 4096;
+            boolean transactedMode = false;
+            int deliveryMode = DeliveryMode.NON_PERSISTENT;
+
+            if (args.length > 6)
+            {
+                messageSize = Integer.parseInt(args[6]);
+            }
+            if (args.length > 7)
+            {
+                deliveryMode = args[7].toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT
+                               : DeliveryMode.NON_PERSISTENT;
+
+                transactedMode = args[8].toUpperCase().charAt(0) == 'T' ? true : false;
+            }
+
+            if (args.length > 9)
+            {
+                _callbackHandlerWaitingTime = Long.parseLong(args[9]) * 1000;
+            }
+
+            _log.info("Each message size = " + messageSize + " bytes");
 
             InetAddress address = InetAddress.getLocalHost();
             String clientID = address.getHostName() + System.currentTimeMillis();
             ServiceRequestingClient client = new ServiceRequestingClient(args[0], clientID, args[1], args[2], args[3],
-                                                                         args[4], Integer.parseInt(args[5]),
-                                                                         messageDataLength);
+                                                                         args[4], deliveryMode, transactedMode, Integer.parseInt(args[5]),
+                                                                         messageSize);
             Object waiter = new Object();
             client.run(waiter);
+
+            // Start a thread to
             synchronized (waiter)
             {
                 while (!client.isCompleted())
@@ -279,7 +405,6 @@
                     waiter.wait();
                 }
             }
-
         }
         catch (UnknownHostException e)
         {
@@ -292,7 +417,7 @@
         }
     }
 
-     /**
+    /**
      * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
      */
     public void onException(JMSException e)

Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java?view=diff&rev=500188&r1=500187&r2=500188
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java Fri Jan 26 00:52:20 2007
@@ -22,14 +22,12 @@
 
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.config.ConnectorConfig;
-import org.apache.qpid.config.ConnectionFactoryInitialiser;
 import org.apache.qpid.config.Connector;
 import org.apache.qpid.config.AbstractConfig;
 
 import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
 
-class Config extends AbstractConfig implements ConnectorConfig
+public class Config extends AbstractConfig implements ConnectorConfig
 {
 
     private String host = "localhost";
@@ -45,23 +43,30 @@
     private int ackMode= AMQSession.NO_ACKNOWLEDGE;
     private String clientId;
     private String subscriptionId;
+    private String selector;
+    private String destinationName;
     private boolean persistent;
+    private boolean transacted;
+    private int destinationsCount;
+    private int batchSize;
+    private int rate;
+    private boolean ispubsub;
 
     public Config()
     {
     }
 
-    int getAckMode()
+    public int getAckMode()
     {
         return ackMode;
     }
 
-    void setPayload(int payload)
+    public void setPayload(int payload)
     {
         this.payload = payload;
     }
 
-    int getPayload()
+    public int getPayload()
     {
         return payload;
     }
@@ -81,11 +86,26 @@
         this.messages = messages;
     }
 
-    int getMessages()
+    public int getMessages()
     {
         return messages;
     }
 
+    public int getBatchSize()
+    {
+        return batchSize;
+    }
+
+    public int getRate()
+    {
+        return rate;
+    }
+
+    public int getDestinationsCount()
+    {
+        return destinationsCount;
+    }
+
     public String getHost()
     {
         return host;
@@ -141,21 +161,41 @@
         this.delay = delay;
     }
 
-    String getClientId()
+    public String getClientId()
     {
         return clientId;
     }
 
-    String getSubscriptionId()
+    public String getSubscriptionId()
     {
         return subscriptionId;
     }
 
-    boolean usePersistentMessages()
+    public String getSelector()
+    {
+        return selector;
+    }
+
+    public String getDestination()
+    {
+        return destinationName;
+    }
+
+    public boolean usePersistentMessages()
     {
         return persistent;
     }
 
+    public boolean isTransacted()
+    {
+        return transacted;
+    }
+
+    public boolean isPubSub()
+    {
+        return ispubsub;
+    }
+
     public void setOption(String key, String value)
     {
         if("-host".equalsIgnoreCase(key))
@@ -216,6 +256,34 @@
         else if("-persistent".equalsIgnoreCase(key))
         {
             persistent = "true".equalsIgnoreCase(value);
+        }
+        else if("-transacted".equalsIgnoreCase(key))
+        {
+            transacted = "true".equalsIgnoreCase(value);
+        }
+        else if ("-destinationscount".equalsIgnoreCase(key))
+        {
+            destinationsCount = parseInt("Bad destinations count", value);
+        }
+        else if ("-batchsize".equalsIgnoreCase(key))
+        {
+            batchSize = parseInt("Bad batch size", value);
+        }
+        else if ("-rate".equalsIgnoreCase(key))
+        {
+            rate = parseInt("MEssage rate", value);
+        }
+        else if("-pubsub".equalsIgnoreCase(key))
+        {
+            ispubsub = "true".equalsIgnoreCase(value);
+        }
+        else if("-selector".equalsIgnoreCase(key))
+        {
+            selector = value;
+        }
+        else if("-destinationname".equalsIgnoreCase(key))
+        {
+            destinationName = value;
         }
         else
         {

Modified: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java?view=diff&rev=500188&r1=500187&r2=500188
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Publisher.java Fri Jan 26 00:52:20 2007
@@ -51,7 +51,7 @@
         _factory.createControlConsumer().setMessageListener(this);
         _connection.start();
 
-        if(warmup > 0)
+        if (warmup > 0)
         {
             System.out.println("Runing warmup (" + warmup + " msgs)");
             long time = batch(warmup, consumerCount);
@@ -59,11 +59,14 @@
         }
 
         long[] times = new long[batches];
-        for(int i = 0; i < batches; i++)
+        for (int i = 0; i < batches; i++)
         {
-            if(i > 0) Thread.sleep(delay*1000);
+            if (i > 0)
+            {
+                Thread.sleep(delay * 1000);
+            }
             times[i] = batch(msgCount, consumerCount);
-            System.out.println("Batch " + (i+1) + " of " + batches + " completed in " + times[i] + " ms.");
+            System.out.println("Batch " + (i + 1) + " of " + batches + " completed in " + times[i] + " ms.");
         }
 
         long min = min(times);
@@ -131,7 +134,7 @@
     static long min(long[] times)
     {
         long min = times.length > 0 ? times[0] : 0;
-        for(int i = 0; i < times.length; i++)
+        for (int i = 0; i < times.length; i++)
         {
             min = Math.min(min, times[i]);
         }
@@ -141,7 +144,7 @@
     static long max(long[] times)
     {
         long max = times.length > 0 ? times[0] : 0;
-        for(int i = 0; i < times.length; i++)
+        for (int i = 0; i < times.length; i++)
         {
             max = Math.max(max, times[i]);
         }
@@ -151,14 +154,22 @@
     static long avg(long[] times, long min, long max)
     {
         long sum = 0;
-        for(int i = 0; i < times.length; i++)
+        for (int i = 0; i < times.length; i++)
         {
             sum += times[i];
         }
-        sum -= min;
-        sum -= max;
 
-        return (sum / (times.length - 2));
+        int adjustment = 0;
+
+        // Remove min and max if we have run enough batches.
+        if (times.length > 2)
+        {
+            sum -= min;
+            sum -= max;
+            adjustment = 2;
+        }
+
+        return (sum / (times.length - adjustment));
     }
 
     public static void main(String[] argv) throws Exception

Added: incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/perftests.log4j
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/perftests.log4j?view=auto&rev=500188
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/perftests.log4j (added)
+++ incubator/qpid/branches/perftesting/qpid/java/perftests/src/main/java/perftests.log4j Fri Jan 26 00:52:20 2007
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+log4j.rootLogger=${root.logging.level}
+
+
+log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+log4j.additivity.org.apache.qpid=false
+
+log4j.logger.org.apache.qpid.requestreply=${amqj.test.logging.level}
+log4j.logger.org.apache.qpid.pingpong=${amqj.test.logging.level}
+log4j.logger.org.apache.qpid.topic=${amqj.test.logging.level}
+
+
+log4j.logger.uk.co.thebadgerset.junit.extensions=info, console
+log4j.additivity.uk.co.thebadgerset.junit.extensions=false
+log4j.logger.uk.co.thebadgerset.junit.extensions=info
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=all
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+
+#log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
+log4j.appender.console.layout.ConversionPattern=%p [%c] %m%n
+
+log4j.appender.fileApp=org.apache.log4j.FileAppender
+log4j.appender.fileApp.file=${log.dir}/perftests.volumetest.log
+log4j.appender.fileApp.Threshold=info
+log4j.appender.fileApp.append=false
+log4j.appender.fileApp.layout=org.apache.log4j.PatternLayout