You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/01 15:30:31 UTC
svn commit: r491577 - in /incubator/qpid/trunk/qpid/java/perftests: bin/
src/main/java/org/apache/qpid/requestreply/
Author: rgreig
Date: Mon Jan 1 06:30:31 2007
New Revision: 491577
URL: http://svn.apache.org/viewvc?view=rev&rev=491577
Log:
QPID-232 Added the service request/reply test
Added:
incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh (with props)
incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh (with props)
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java (with props)
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java (with props)
Added: incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh?view=auto&rev=491577
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh (added)
+++ incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh Mon Jan 1 06:30:31 2007
@@ -0,0 +1,25 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# XXX -Xms1024m -XX:NewSize=300m
+. ./setupclasspath.sh
+echo $CP
+# usage: just pass in the host(s)
+$JAVA_HOME/bin/java -cp $CP org.apache.qpid.requestreply.ServiceProvidingClient $1 guest guest /test serviceQ
Propchange: incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh?view=auto&rev=491577
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh (added)
+++ incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh Mon Jan 1 06:30:31 2007
@@ -0,0 +1,27 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# args supplied: <host:port> <num messages>
+thehosts=$1
+shift
+echo $thehosts
+# XXX -Xms1024m -XX:NewSize=300m
+. ./setupclasspath.sh
+echo $CP
+$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="INFO" org.apache.qpid.requestreply.ServiceRequestingClient $thehosts guest guest /test serviceQ "$@"
Propchange: incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java?view=auto&rev=491577
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java Mon Jan 1 06:30:31 2007
@@ -0,0 +1,201 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class ServiceProvidingClient
+{
+ private static final Logger _logger = Logger.getLogger(ServiceProvidingClient.class);
+
+ private MessageProducer _destinationProducer;
+
+ private Destination _responseDest;
+
+ private AMQConnection _connection;
+
+ public ServiceProvidingClient(String brokerDetails, String username, String password,
+ String clientName, String virtualPath, String serviceName)
+ throws AMQException, JMSException, URLSyntaxException
+ {
+ _connection = new AMQConnection(brokerDetails, username, password,
+ clientName, virtualPath);
+ _connection.setConnectionListener(new ConnectionListener()
+ {
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _logger.info("App got failover complete callback");
+ }
+ });
+ final Session session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _logger.info("Service (queue) name is '" + serviceName + "'...");
+
+ AMQQueue destination = new AMQQueue(serviceName);
+
+ MessageConsumer consumer = session.createConsumer(destination,
+ 100, true, false, null);
+
+ consumer.setMessageListener(new MessageListener()
+ {
+ private int _messageCount;
+
+ public void onMessage(Message message)
+ {
+ //_logger.info("Got message '" + message + "'");
+
+ TextMessage tm = (TextMessage) message;
+
+ try
+ {
+ Destination responseDest = tm.getJMSReplyTo();
+ if (responseDest == null)
+ {
+ _logger.info("Producer not created because the response destination is null.");
+ return;
+ }
+
+ if (!responseDest.equals(_responseDest))
+ {
+ _responseDest = responseDest;
+
+ _logger.info("About to create a producer");
+ _destinationProducer = session.createProducer(responseDest);
+ _destinationProducer.setDisableMessageTimestamp(true);
+ _destinationProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ _logger.info("After create a producer");
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error creating destination");
+ }
+ _messageCount++;
+ if (_messageCount % 1000 == 0)
+ {
+ _logger.info("Received message total: " + _messageCount);
+ _logger.info("Sending response to '" + _responseDest + "'");
+ }
+
+ try
+ {
+ String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.getText();
+ TextMessage msg = session.createTextMessage(payload);
+ if (tm.propertyExists("timeSent"))
+ {
+ _logger.info("timeSent property set on message");
+ _logger.info("timeSent value is: " + tm.getLongProperty("timeSent"));
+ msg.setStringProperty("timeSent", tm.getStringProperty("timeSent"));
+ }
+ _destinationProducer.send(msg);
+ if (_messageCount % 1000 == 0)
+ {
+ _logger.info("Sent response to '" + _responseDest + "'");
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error sending message: " + e, e);
+ }
+ }
+ });
+ }
+
+ public void run() throws JMSException
+ {
+ _connection.start();
+ _logger.info("Waiting...");
+ }
+
+ public static void main(String[] args)
+ {
+ _logger.info("Starting...");
+
+ if (args.length < 5)
+ {
+ System.out.println("Usage: brokerDetails username password virtual-path serviceQueue [selector]");
+ System.exit(1);
+ }
+ String clientId = null;
+ try
+ {
+ InetAddress address = InetAddress.getLocalHost();
+ clientId = address.getHostName() + System.currentTimeMillis();
+ }
+ catch (UnknownHostException e)
+ {
+ _logger.error("Error: " + e, e);
+ }
+
+ try
+ {
+ ServiceProvidingClient client = new ServiceProvidingClient(args[0], args[1], args[2],
+ clientId, args[3], args[4]);
+ client.run();
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error: " + e, e);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error: " + e, e);
+ }
+ catch (URLSyntaxException e)
+ {
+ _logger.error("Error: " + e, e);
+ }
+
+
+
+ }
+
+}
+
Propchange: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java?view=auto&rev=491577
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java Mon Jan 1 06:30:31 2007
@@ -0,0 +1,303 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.jms.MessageConsumer;
+import org.apache.qpid.jms.MessageProducer;
+import org.apache.qpid.jms.Session;
+
+import javax.jms.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * A client that behaves as follows:
+ * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li>
+ * <li>Creates a temporary queue</li>
+ * <li>Creates messages containing a property that is the name of the temporary queue</li>
+ * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li>
+ * </ul>
+ *
+ */
+public class ServiceRequestingClient implements ExceptionListener
+{
+ private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class);
+
+ private static final String MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk ";
+
+ private String MESSAGE_DATA;
+
+ private AMQConnection _connection;
+
+ private Session _session;
+
+ private long _averageLatency;
+
+ private int _messageCount;
+
+ private volatile boolean _completed;
+
+ private AMQDestination _tempDestination;
+
+ private MessageProducer _producer;
+
+ private Object _waiter;
+
+ private static String createMessagePayload(int size)
+ {
+ _log.info("Message size set to " + size + " bytes");
+ StringBuffer buf = new StringBuffer(size);
+ int count = 0;
+ while (count < size + MESSAGE_DATA_BYTES.length())
+ {
+ buf.append(MESSAGE_DATA_BYTES);
+ count += MESSAGE_DATA_BYTES.length();
+ }
+ if (count < size)
+ {
+ buf.append(MESSAGE_DATA_BYTES, 0, size - count);
+ }
+
+ return buf.toString();
+ }
+
+ private class CallbackHandler implements MessageListener
+ {
+ private int _expectedMessageCount;
+
+ private int _actualMessageCount;
+
+ private long _startTime;
+
+ public CallbackHandler(int expectedMessageCount, long startTime)
+ {
+ _expectedMessageCount = expectedMessageCount;
+ _startTime = startTime;
+ }
+
+ public void onMessage(Message m)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message received: " + m);
+ }
+ try
+ {
+ m.getPropertyNames();
+ if (m.propertyExists("timeSent"))
+ {
+ long timeSent = Long.parseLong(m.getStringProperty("timeSent"));
+ long now = System.currentTimeMillis();
+ if (_averageLatency == 0)
+ {
+ _averageLatency = now - timeSent;
+ _log.info("Latency " + _averageLatency);
+ }
+ else
+ {
+ _log.info("Individual latency: " + (now - timeSent));
+ _averageLatency = (_averageLatency + (now - timeSent)) / 2;
+ _log.info("Average latency now: " + _averageLatency);
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ _log.error("Error getting latency data: " + e, e);
+ }
+ _actualMessageCount++;
+ if (_actualMessageCount % 1000 == 0)
+ {
+ _log.info("Received message count: " + _actualMessageCount);
+ }
+
+ if (_actualMessageCount == _expectedMessageCount)
+ {
+ _completed = true;
+ notifyWaiter();
+ long timeTaken = System.currentTimeMillis() - _startTime;
+ _log.info("Total time taken to receive " + _expectedMessageCount + " messages was " +
+ timeTaken + "ms, equivalent to " +
+ (_expectedMessageCount / (timeTaken / 1000.0)) + " messages per second");
+
+ try
+ {
+ _connection.close();
+ _log.info("Connection closed");
+ }
+ catch (JMSException e)
+ {
+ _log.error("Error closing connection");
+ }
+ }
+ }
+ }
+
+ private void notifyWaiter()
+ {
+ if (_waiter != null)
+ {
+ synchronized (_waiter)
+ {
+ _waiter.notify();
+ }
+ }
+ }
+ public ServiceRequestingClient(String brokerHosts, String clientID, String username, String password,
+ String vpath, String commandQueueName,
+ final int messageCount, final int messageDataLength) throws AMQException, URLSyntaxException
+ {
+ _messageCount = messageCount;
+ MESSAGE_DATA = createMessagePayload(messageDataLength);
+ try
+ {
+ createConnection(brokerHosts, clientID, username, password, vpath);
+ _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ _connection.setExceptionListener(this);
+
+
+ AMQQueue destination = new AMQQueue(commandQueueName);
+ _producer = (MessageProducer) _session.createProducer(destination);
+ _producer.setDisableMessageTimestamp(true);
+ _producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ _tempDestination = new AMQQueue("TempResponse" +
+ Long.toString(System.currentTimeMillis()), true);
+ MessageConsumer messageConsumer = (MessageConsumer) _session.createConsumer(_tempDestination, 100, true,
+ true, null);
+
+ //Send first message, then wait a bit to allow the provider to get initialised
+ TextMessage first = _session.createTextMessage(MESSAGE_DATA);
+ first.setJMSReplyTo(_tempDestination);
+ _producer.send(first);
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
+ //now start the clock and the test...
+ final long startTime = System.currentTimeMillis();
+
+ messageConsumer.setMessageListener(new CallbackHandler(messageCount, startTime));
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+
+ /**
+ * Run the test and notify an object upon receipt of all responses.
+ * @param waiter the object that will be notified
+ * @throws JMSException
+ */
+ public void run(Object waiter) throws JMSException
+ {
+ _waiter = waiter;
+ _connection.start();
+ for (int i = 1; i < _messageCount; i++)
+ {
+ TextMessage msg = _session.createTextMessage(MESSAGE_DATA + i);
+ msg.setJMSReplyTo(_tempDestination);
+ if (i % 1000 == 0)
+ {
+ long timeNow = System.currentTimeMillis();
+ msg.setStringProperty("timeSent", String.valueOf(timeNow));
+ }
+ _producer.send(msg);
+ }
+ _log.info("Finished sending " + _messageCount + " messages");
+ }
+
+ public boolean isCompleted()
+ {
+ return _completed;
+ }
+
+ private void createConnection(String brokerHosts, String clientID, String username, String password,
+ String vpath) throws AMQException, URLSyntaxException
+ {
+ _connection = new AMQConnection(brokerHosts, username, password,
+ clientID, vpath);
+ }
+
+ /**
+ * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank
+ * means the server will allocate a name.
+ */
+ public static void main(String[] args)
+ {
+ if (args.length < 6)
+ {
+ System.err.println(
+ "Usage: ServiceRequestingClient <brokerDetails - semicolon separated host:port list> <username> <password> <vpath> <command queue name> <number of messages> <message size>");
+ }
+ try
+ {
+ int messageDataLength = args.length > 6 ? Integer.parseInt(args[6]) : 4096;
+
+ InetAddress address = InetAddress.getLocalHost();
+ String clientID = address.getHostName() + System.currentTimeMillis();
+ ServiceRequestingClient client = new ServiceRequestingClient(args[0], clientID, args[1], args[2], args[3],
+ args[4], Integer.parseInt(args[5]),
+ messageDataLength);
+ Object waiter = new Object();
+ client.run(waiter);
+ synchronized (waiter)
+ {
+ while (!client.isCompleted())
+ {
+ waiter.wait();
+ }
+ }
+
+ }
+ catch (UnknownHostException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ catch (Exception e)
+ {
+ System.err.println("Error in client: " + e);
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
+ */
+ public void onException(JMSException e)
+ {
+ System.err.println(e.getMessage());
+ e.printStackTrace(System.err);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
------------------------------------------------------------------------------
svn:eol-style = native