You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/01 15:30:31 UTC

svn commit: r491577 - in /incubator/qpid/trunk/qpid/java/perftests: bin/ src/main/java/org/apache/qpid/requestreply/

Author: rgreig
Date: Mon Jan  1 06:30:31 2007
New Revision: 491577

URL: http://svn.apache.org/viewvc?view=rev&rev=491577
Log:
QPID-232 Added the service request/reply test

Added:
    incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh   (with props)
    incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh   (with props)
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java   (with props)
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java   (with props)

Added: incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh?view=auto&rev=491577
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh (added)
+++ incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh Mon Jan  1 06:30:31 2007
@@ -0,0 +1,25 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# XXX -Xms1024m -XX:NewSize=300m
+. ./setupclasspath.sh
+echo $CP
+# usage: just pass in the host(s)
+$JAVA_HOME/bin/java -cp $CP org.apache.qpid.requestreply.ServiceProvidingClient $1 guest guest /test serviceQ

Propchange: incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh?view=auto&rev=491577
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh (added)
+++ incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh Mon Jan  1 06:30:31 2007
@@ -0,0 +1,27 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# args supplied: <host:port> <num messages>
+thehosts=$1
+shift
+echo $thehosts
+# XXX -Xms1024m -XX:NewSize=300m
+. ./setupclasspath.sh
+echo $CP
+$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="INFO" org.apache.qpid.requestreply.ServiceRequestingClient $thehosts guest guest /test serviceQ "$@"

Propchange: incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java?view=auto&rev=491577
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java Mon Jan  1 06:30:31 2007
@@ -0,0 +1,201 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class ServiceProvidingClient
+{
+    private static final Logger _logger = Logger.getLogger(ServiceProvidingClient.class);
+
+    private MessageProducer _destinationProducer;
+
+    private Destination _responseDest;
+
+    private AMQConnection _connection;
+
+    public ServiceProvidingClient(String brokerDetails, String username, String password,
+                                  String clientName, String virtualPath, String serviceName)
+            throws AMQException, JMSException, URLSyntaxException
+    {
+        _connection = new AMQConnection(brokerDetails, username, password,
+                                        clientName, virtualPath);
+        _connection.setConnectionListener(new ConnectionListener()
+        {
+
+            public void bytesSent(long count)
+            {
+            }
+
+            public void bytesReceived(long count)
+            {
+            }
+
+            public boolean preFailover(boolean redirect)
+            {
+                return true;
+            }
+
+            public boolean preResubscribe()
+            {
+                return true;
+            }
+
+            public void failoverComplete()
+            {
+                _logger.info("App got failover complete callback");
+            }
+        });
+        final Session session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        _logger.info("Service (queue) name is '" + serviceName + "'...");
+
+        AMQQueue destination = new AMQQueue(serviceName);
+
+        MessageConsumer consumer = session.createConsumer(destination,
+                                                          100, true, false, null);
+
+        consumer.setMessageListener(new MessageListener()
+        {
+            private int _messageCount;
+
+            public void onMessage(Message message)
+            {
+                //_logger.info("Got message '" + message + "'");
+
+                TextMessage tm = (TextMessage) message;
+
+                try
+                {
+                    Destination responseDest = tm.getJMSReplyTo();
+                    if (responseDest == null)
+                    {
+                        _logger.info("Producer not created because the response destination is null.");
+                        return;
+                    }
+
+                    if (!responseDest.equals(_responseDest))
+                    {
+                        _responseDest = responseDest;
+
+                        _logger.info("About to create a producer");
+                        _destinationProducer = session.createProducer(responseDest);
+                        _destinationProducer.setDisableMessageTimestamp(true);
+                        _destinationProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                        _logger.info("After create a producer");
+                    }
+                }
+                catch (JMSException e)
+                {
+                    _logger.error("Error creating destination");
+                }
+                _messageCount++;
+                if (_messageCount % 1000 == 0)
+                {
+                    _logger.info("Received message total: " + _messageCount);
+                    _logger.info("Sending response to '" + _responseDest + "'");
+                }
+
+                try
+                {
+                    String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.getText();
+                    TextMessage msg = session.createTextMessage(payload);
+                    if (tm.propertyExists("timeSent"))
+                    {
+                        _logger.info("timeSent property set on message");
+                        _logger.info("timeSent value is: " + tm.getLongProperty("timeSent"));
+                        msg.setStringProperty("timeSent", tm.getStringProperty("timeSent"));
+                    }
+                    _destinationProducer.send(msg);
+                    if (_messageCount % 1000 == 0)
+                    {
+                        _logger.info("Sent response to '" + _responseDest + "'");
+                    }
+                }
+                catch (JMSException e)
+                {
+                    _logger.error("Error sending message: " + e, e);
+                }
+            }
+        });
+    }
+
+    public void run() throws JMSException
+    {
+        _connection.start();
+        _logger.info("Waiting...");
+    }
+
+    public static void main(String[] args)
+    {
+        _logger.info("Starting...");
+
+        if (args.length < 5)
+        {
+            System.out.println("Usage: brokerDetails username password virtual-path serviceQueue [selector]");
+            System.exit(1);
+        }
+        String clientId = null;
+        try
+        {
+            InetAddress address = InetAddress.getLocalHost();
+            clientId = address.getHostName() + System.currentTimeMillis();
+        }
+        catch (UnknownHostException e)
+        {
+            _logger.error("Error: " + e, e);
+        }
+
+        try
+        {
+            ServiceProvidingClient client = new ServiceProvidingClient(args[0], args[1], args[2],
+                                                                       clientId, args[3], args[4]);
+            client.run();
+        }
+        catch (JMSException e)
+        {
+            _logger.error("Error: " + e, e);
+        }
+        catch (AMQException e)
+        {
+            _logger.error("Error: " + e, e);
+        }
+        catch (URLSyntaxException e)
+        {
+            _logger.error("Error: " + e, e);
+        }
+
+
+
+    }
+
+}
+

Propchange: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java?view=auto&rev=491577
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java Mon Jan  1 06:30:31 2007
@@ -0,0 +1,303 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.jms.MessageConsumer;
+import org.apache.qpid.jms.MessageProducer;
+import org.apache.qpid.jms.Session;
+
+import javax.jms.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * A client that behaves as follows:
+ * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li>
+ * <li>Creates a temporary queue</li>
+ * <li>Creates messages containing a property that is the name of the temporary queue</li>
+ * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li>
+ * </ul>
+ *
+ */
+public class ServiceRequestingClient implements ExceptionListener
+{
+    private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class);
+
+    private static final String MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk  ";
+
+    private String MESSAGE_DATA;
+
+    private AMQConnection _connection;
+
+    private Session _session;
+
+    private long _averageLatency;
+
+    private int _messageCount;
+
+    private volatile boolean _completed;
+
+    private AMQDestination _tempDestination;
+
+    private MessageProducer _producer;
+
+    private Object _waiter;
+
+    private static String createMessagePayload(int size)
+    {
+        _log.info("Message size set to " + size + " bytes");
+        StringBuffer buf = new StringBuffer(size);
+        int count = 0;
+        while (count < size + MESSAGE_DATA_BYTES.length())
+        {
+            buf.append(MESSAGE_DATA_BYTES);
+            count += MESSAGE_DATA_BYTES.length();
+        }
+        if (count < size)
+        {
+            buf.append(MESSAGE_DATA_BYTES, 0, size - count);
+        }
+
+        return buf.toString();
+    }
+
+    private class CallbackHandler implements MessageListener
+    {
+        private int _expectedMessageCount;
+
+        private int _actualMessageCount;
+
+        private long _startTime;
+
+        public CallbackHandler(int expectedMessageCount, long startTime)
+        {
+            _expectedMessageCount = expectedMessageCount;
+            _startTime = startTime;
+        }
+
+        public void onMessage(Message m)
+        {
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Message received: " + m);
+            }
+            try
+            {
+                m.getPropertyNames();
+                if (m.propertyExists("timeSent"))
+                {
+                    long timeSent = Long.parseLong(m.getStringProperty("timeSent"));
+                    long now = System.currentTimeMillis();
+                    if (_averageLatency == 0)
+                    {
+                        _averageLatency = now - timeSent;
+                        _log.info("Latency " + _averageLatency);
+                    }
+                    else
+                    {
+                        _log.info("Individual latency: " + (now - timeSent));
+                        _averageLatency = (_averageLatency + (now - timeSent)) / 2;
+                        _log.info("Average latency now: " + _averageLatency);
+                    }
+                }
+            }
+            catch (JMSException e)
+            {
+                _log.error("Error getting latency data: " + e, e);
+            }
+            _actualMessageCount++;
+            if (_actualMessageCount % 1000 == 0)
+            {
+                _log.info("Received message count: " + _actualMessageCount);
+            }
+
+            if (_actualMessageCount == _expectedMessageCount)
+            {
+                _completed = true;
+                notifyWaiter();
+                long timeTaken = System.currentTimeMillis() - _startTime;
+                _log.info("Total time taken to receive " + _expectedMessageCount + " messages was " +
+                          timeTaken + "ms, equivalent to " +
+                          (_expectedMessageCount / (timeTaken / 1000.0)) + " messages per second");
+
+                try
+                {
+                    _connection.close();
+                    _log.info("Connection closed");
+                }
+                catch (JMSException e)
+                {
+                    _log.error("Error closing connection");
+                }
+            }
+        }
+    }
+
+    private void notifyWaiter()
+    {
+        if (_waiter != null)
+        {
+            synchronized (_waiter)
+            {
+                _waiter.notify();
+            }
+        }
+    }
+    public ServiceRequestingClient(String brokerHosts, String clientID, String username, String password,
+                                   String vpath, String commandQueueName,
+                                   final int messageCount, final int messageDataLength) throws AMQException, URLSyntaxException
+    {
+        _messageCount = messageCount;
+        MESSAGE_DATA = createMessagePayload(messageDataLength);
+        try
+        {
+            createConnection(brokerHosts, clientID, username, password, vpath);
+            _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+            _connection.setExceptionListener(this);
+
+
+            AMQQueue destination = new AMQQueue(commandQueueName);
+            _producer = (MessageProducer) _session.createProducer(destination);
+            _producer.setDisableMessageTimestamp(true);
+            _producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+            _tempDestination = new AMQQueue("TempResponse" +
+                                            Long.toString(System.currentTimeMillis()), true);
+            MessageConsumer messageConsumer = (MessageConsumer) _session.createConsumer(_tempDestination, 100, true,
+                                                                                        true, null);
+
+            //Send first message, then wait a bit to allow the provider to get initialised
+            TextMessage first = _session.createTextMessage(MESSAGE_DATA);
+            first.setJMSReplyTo(_tempDestination);
+            _producer.send(first);
+            try
+            {
+                Thread.sleep(1000);
+            }
+            catch (InterruptedException ignore)
+            {
+            }
+
+            //now start the clock and the test...
+            final long startTime = System.currentTimeMillis();
+
+            messageConsumer.setMessageListener(new CallbackHandler(messageCount, startTime));
+        }
+        catch (JMSException e)
+        {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+    }
+
+    /**
+     * Run the test and notify an object upon receipt of all responses.
+     * @param waiter the object that will be notified
+     * @throws JMSException
+     */
+    public void run(Object waiter) throws JMSException
+    {
+        _waiter = waiter;
+        _connection.start();
+        for (int i = 1; i < _messageCount; i++)
+        {
+            TextMessage msg = _session.createTextMessage(MESSAGE_DATA + i);
+            msg.setJMSReplyTo(_tempDestination);
+            if (i % 1000 == 0)
+            {
+                long timeNow = System.currentTimeMillis();
+                msg.setStringProperty("timeSent", String.valueOf(timeNow));
+            }
+            _producer.send(msg);
+        }
+        _log.info("Finished sending " + _messageCount + " messages");
+    }
+
+    public boolean isCompleted()
+    {
+        return _completed;
+    }
+
+    private void createConnection(String brokerHosts, String clientID, String username, String password,
+                                  String vpath) throws AMQException, URLSyntaxException
+    {
+        _connection = new AMQConnection(brokerHosts, username, password,
+                                        clientID, vpath);
+    }
+
+    /**
+     * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank
+     *             means the server will allocate a name.
+     */
+    public static void main(String[] args)
+    {
+        if (args.length < 6)
+        {
+            System.err.println(
+                    "Usage: ServiceRequestingClient <brokerDetails - semicolon separated host:port list> <username> <password> <vpath> <command queue name> <number of messages> <message size>");
+        }
+        try
+        {
+            int messageDataLength = args.length > 6 ? Integer.parseInt(args[6]) : 4096;
+
+            InetAddress address = InetAddress.getLocalHost();
+            String clientID = address.getHostName() + System.currentTimeMillis();
+            ServiceRequestingClient client = new ServiceRequestingClient(args[0], clientID, args[1], args[2], args[3],
+                                                                         args[4], Integer.parseInt(args[5]),
+                                                                         messageDataLength);
+            Object waiter = new Object();
+            client.run(waiter);
+            synchronized (waiter)
+            {
+                while (!client.isCompleted())
+                {
+                    waiter.wait();
+                }
+            }
+
+        }
+        catch (UnknownHostException e)
+        {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+        catch (Exception e)
+        {
+            System.err.println("Error in client: " + e);
+            e.printStackTrace();
+        }
+    }
+
+     /**
+     * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
+     */
+    public void onException(JMSException e)
+    {
+        System.err.println(e.getMessage());
+        e.printStackTrace(System.err);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
------------------------------------------------------------------------------
    svn:eol-style = native