You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/11/29 23:46:31 UTC
svn commit: r599611 - in
/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf:
JMSAsyncConsumer.java JMSConsumer.java JMSProducer.java
JMSSyncConsumer.java MessageConsumerTest.java MessageProducerTest.java
Options.java
Author: rajith
Date: Thu Nov 29 14:46:28 2007
New Revision: 599611
URL: http://svn.apache.org/viewvc?rev=599611&view=rev
Log:
Removed all the fancy threading with the test cases as these interfer with the test cases
Removed:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSProducer.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java
Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java?rev=599611&r1=599610&r2=599611&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java Thu Nov 29 14:46:28 2007
@@ -4,114 +4,96 @@
import java.io.IOException;
import java.sql.Date;
import java.text.SimpleDateFormat;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.message.TestMessageFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MessageConsumerTest extends Options implements Runnable
+public class MessageConsumerTest extends Options implements MessageListener
{
private static final Logger _logger = LoggerFactory.getLogger(MessageConsumerTest.class);
private SimpleDateFormat df = new SimpleDateFormat("h:mm a");
- private Map<Integer, JMSConsumer> _consumers = new ConcurrentHashMap<Integer, JMSConsumer>();
- private int _count;
String _logFileName;
- private long _gracePeriod = 5 * 60 * 1000;
long _startTime;
long _totalMsgCount;
+ long _intervalCount;
- public void start() throws Exception
- {
- this.parseOptions();
- boolean useSameDest = true;
- _logFileName = _logFilePath + "/MessageConsumerTest_" + System.currentTimeMillis();
-
- // use each destination with a different consumerucer
- if (_consumerCount == destArray.length)
- {
- useSameDest = false;
- }
- for (; _count < _consumerCount; _count++)
- {
- createAndStartConsumer(useSameDest ? destArray[0] : destArray[_count]);
- }
- }
-
- private void createAndStartConsumer(String routingKey) throws Exception
- {
- AMQConnection con = ConnectionUtility.getInstance().getConnection();
- con.start();
- Destination dest = Boolean.getBoolean("useQueue")? new AMQQueue(con,routingKey) : new AMQTopic(con,routingKey);
- JMSConsumer consumer;
- if (_synchronous)
- {
- consumer = new JMSSyncConsumer(String.valueOf(_count), con, dest, _transacted, Session.AUTO_ACKNOWLEDGE);
- Thread t = new Thread((JMSSyncConsumer) consumer);
- t.setName("JMSSyncConsumer-" + _count);
- t.start();
- }
- else
- {
- consumer = new JMSAsyncConsumer(String.valueOf(_count), con, dest, _transacted, Session.AUTO_ACKNOWLEDGE);
- }
- _consumers.put(_count, consumer);
- }
-
- private void startTimerThread()
- {
- _startTime = System.currentTimeMillis();
- if(Boolean.getBoolean("collect_stats"))
- {
- Thread t = new Thread(this);
- t.setName("MessageConsumerTest-TimerThread");
- t.start();
- }
- try
- {
- printSummary();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
+ private Connection _connection;
+ private Session _session;
+ private BytesMessage _payload;
+ private MessageConsumer _consumer;
+ private boolean _verifyOrder = false;
+
+ public void init() throws Exception
+ {
+ this.parseOptions();
+ _logFileName = _logFilePath + "/MessageConsumerTest_" + System.currentTimeMillis();
+
+ AMQConnection _connection = ConnectionUtility.getInstance().getConnection();
+ _connection.start();
+ Destination dest = Boolean.getBoolean("useQueue")? new AMQQueue(_connection,_destination) : new AMQTopic(_connection,_destination);
+ _session = _connection.createSession(_transacted, Session.AUTO_ACKNOWLEDGE);
+ _payload = TestMessageFactory.newBytesMessage(_session, _messageSize);
+ _consumer = _session.createConsumer(dest);
+ if(!_synchronous)
+ {
+ _consumer.setMessageListener(this);
+ }
+ _verifyOrder = Boolean.getBoolean("verifyOrder");
+
+ _startTime = System.currentTimeMillis();
+ boolean run = true;
+ if(Boolean.getBoolean("collect_stats"))
+ {
+ printHeading();
+ runReaper();
+ }
}
- public void run()
+ public void onMessage(Message message)
{
- boolean run = true;
- printHeading();
- runReaper();
try
{
- while (run)
+ /* long msgId = Integer.parseInt(message.getJMSMessageID());
+ if (_verifyOrder && _totalMsgCount+1 != msgId)
{
- Thread.sleep(_logDuration);
- runReaper();
-
- if (System.currentTimeMillis() + _gracePeriod - _startTime > _expiry)
+ _logger.error("Error : Message received out of order in JMSSyncConsumer:" + _id + " message id was " + msgId + " expected: " + _currentMsgCount+1);
+ }*/
+ message = null;
+ _totalMsgCount ++;
+ _intervalCount++;
+ if(_intervalCount >= _logFrequency)
+ {
+ _intervalCount = 0;
+ if (Boolean.getBoolean("collect_stats"))
{
- // time to stop the test.
- for (Integer id : _consumers.keySet())
- {
- JMSConsumer consumer = _consumers.get(id);
- consumer.stopConsuming();
- }
runReaper();
- run = false;
+ }
+ if (System.currentTimeMillis() - _startTime >= _expiry)
+ {
+ _session.close();
+ _connection.stop();
+ printSummary();
+ return;
}
}
}
- catch (InterruptedException e)
+ catch(Exception e)
{
- _logger.error("The timer thread exited", e);
+ e.printStackTrace();
}
}
@@ -119,16 +101,6 @@
{
try
{
- long totalMsgCountThisInterval = 0;
-
- for (Integer id : _consumers.keySet())
- {
- JMSConsumer consumer = _consumers.get(id);
- totalMsgCountThisInterval = totalMsgCountThisInterval + consumer.getCurrentMessageCount();
-
- }
- _totalMsgCount = _totalMsgCount + totalMsgCountThisInterval;
-
FileWriter _memoryLog = new FileWriter(_logFileName + ".csv",true);
StringBuffer buf = new StringBuffer();
Date d = new Date(System.currentTimeMillis());
@@ -137,8 +109,6 @@
buf.append(d.getTime()).append(",");
buf.append(_totalMsgCount).append(",");
buf.append(_totalMsgCount*1000 /totaltime).append(",");
- buf.append(totalMsgCountThisInterval).append(",");
- buf.append(totalMsgCountThisInterval*1000/_logDuration).append(",");
buf.append(Runtime.getRuntime().totalMemory() -Runtime.getRuntime().freeMemory()).append("\n");
buf.append("\n");
_memoryLog.write(buf.toString());
@@ -156,7 +126,7 @@
try
{
FileWriter _memoryLog = new FileWriter(_logFileName + ".csv",true);
- String s = "Date/Time,Time (ms),total msg count,total rate (msg/sec),interval count,interval rate (msg/sec),memory";
+ String s = "Date/Time,Time (ms),total msg count,total rate (msg/sec),memory";
_memoryLog.write(s);
_memoryLog.close();
}
@@ -166,33 +136,31 @@
}
}
- private void printSummary() throws Exception
+ private void printSummary()
{
- if (Boolean.getBoolean("collect_stats"))
+ try
{
- for (Integer id : _consumers.keySet())
- {
- JMSConsumer consumer = _consumers.get(id);
- _totalMsgCount = _totalMsgCount + consumer.getCurrentMessageCount();
- }
+ long current = System.currentTimeMillis();
+ double time = current - _startTime;
+ double ratio = _totalMsgCount*1000/time;
+ FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true);
+
+ StringBuffer buf = new StringBuffer("MessageProducerTest \n Test started at : ");
+ buf.append(df.format(new Date(_startTime))).append("\n Test finished at : ");
+ Date d = new Date(current);
+ buf.append(df.format(d)).append("\n Total Time taken (ms):");
+ buf.append(time).append("\n Total messages sent:");
+ buf.append(_totalMsgCount).append("\n producer rate:");
+ buf.append(ratio).append("\n");
+ _summaryLog.write(buf.toString());
+ System.out.println("---------- Test Ended -------------");
+ _summaryLog.close();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
}
-
- long current = System.currentTimeMillis();
- double time = current - _startTime;
- double ratio = _totalMsgCount*1000/time;
- FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true);
-
- StringBuffer buf = new StringBuffer("MessageConsumerTest \n Test started at : ");
- buf.append(df.format(new Date(_startTime))).append("\n Test finished at : ");
- Date d = new Date(current);
- buf.append(df.format(d)).append("\n Total Time taken (ms):");
- buf.append(time).append("\n Total messages sent:");
- buf.append(_totalMsgCount).append("\n consumer rate:");
- buf.append(ratio).append("\n");
- _summaryLog.write(buf.toString());
- System.out.println("---------- Test Ended -------------");
- _summaryLog.close();
}
public static void main(String[] args)
@@ -200,8 +168,7 @@
try
{
MessageConsumerTest test = new MessageConsumerTest();
- test.start();
- test.startTimerThread();
+ test.init();
}
catch (Exception e)
{
Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java?rev=599611&r1=599610&r2=599611&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java Thu Nov 29 14:46:28 2007
@@ -7,119 +7,97 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.message.TestMessageFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MessageProducerTest extends Options implements Runnable
+public class MessageProducerTest extends Options
{
private static final Logger _logger = LoggerFactory.getLogger(MessageProducerTest.class);
private SimpleDateFormat df = new SimpleDateFormat("h:mm a");
- private Map<Integer,JMSProducer> _producers = new ConcurrentHashMap<Integer,JMSProducer>();
- private int _count;
String _logFileName;
long _startTime;
long _totalMsgCount;
+ long _intervalCount;
- public void start() throws Exception
+ private Connection _connection;
+ private Session _session;
+ private BytesMessage _payload;
+ private MessageProducer _producer;
+
+ public void init() throws Exception
{
this.parseOptions();
- boolean useSameDest = true;
_logFileName = _logFilePath + "/MessageProducerTest_" + System.currentTimeMillis();
- // use each destination with a different producer
- if (_producerCount == destArray.length)
- {
- useSameDest = false;
- }
- for (;_count < _producerCount;_count++)
- {
- createAndStartProducer(useSameDest?destArray[0]:destArray[_count]);
- }
+ AMQConnection _connection = ConnectionUtility.getInstance().getConnection();
+ _connection.start();
+ Destination dest = Boolean.getBoolean("useQueue")? new AMQQueue(_connection,_destination) : new AMQTopic(_connection,_destination);
+ _session = _connection.createSession(_transacted, Session.AUTO_ACKNOWLEDGE);
+ _payload = TestMessageFactory.newBytesMessage(_session, _messageSize);
+ _producer = _session.createProducer(dest);
+ // this should speedup the message producer
+ _producer.setDisableMessageTimestamp(true);
}
- private void createAndStartProducer(String routingKey)throws Exception
- {
- AMQConnection con = ConnectionUtility.getInstance().getConnection();
- con.start();
- Destination dest = Boolean.getBoolean("useQueue")? new AMQQueue(con,routingKey) : new AMQTopic(con,routingKey);
- JMSProducer prod = new JMSProducer(String.valueOf(_count),(Connection)con, dest,_messageSize, _transacted);
- Thread t = new Thread(prod);
- t.setName("JMSProducer-"+_count);
- t.start();
- _producers.put(_count, prod);
- }
-
- private void startTimerThread()
+ public void run()
{
_startTime = System.currentTimeMillis();
+ boolean run = true;
if(Boolean.getBoolean("collect_stats"))
{
- Thread t = new Thread(this);
- t.setName("MessageProducerTest-TimerThread");
- t.start();
- }
- try
- {
- printSummary();
+ printHeading();
+ runReaper();
}
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- public void run()
- {
- boolean run = true;
- printHeading();
- runReaper();
try
{
while (run)
{
- Thread.sleep(_logDuration);
- runReaper();
+ _payload.setJMSMessageID(String.valueOf(_totalMsgCount+1));
+ _producer.send(_payload);
+ _totalMsgCount ++;
+ _intervalCount ++;
- if (System.currentTimeMillis() - _startTime > _expiry)
+ // check every x messages to see if times up
+ if(_intervalCount >= _logFrequency)
{
- // time to stop the test.
- for (Integer id : _producers.keySet())
+ _intervalCount = 0;
+ if (Boolean.getBoolean("collect_stats"))
{
- JMSProducer producer = _producers.get(id);
- producer.stopProducing();
+ runReaper();
+ }
+ if (System.currentTimeMillis() - _startTime >= _expiry)
+ {
+ // time to stop the test.
+ _session.close();
+ _connection.stop();
+ run = false;
}
- runReaper();
- run = false;
}
}
}
- catch (InterruptedException e)
+ catch (Exception e)
{
_logger.error("The timer thread exited", e);
}
+ printSummary();
}
public void runReaper()
{
try
{
- long totalMsgCountThisInterval = 0;
-
- for (Integer id : _producers.keySet())
- {
- JMSProducer producer = _producers.get(id);
- totalMsgCountThisInterval = totalMsgCountThisInterval + producer.getCurrentMessageCount();
-
- }
- _totalMsgCount = _totalMsgCount + totalMsgCountThisInterval;
-
FileWriter _memoryLog = new FileWriter(_logFileName + ".csv",true);
StringBuffer buf = new StringBuffer();
Date d = new Date(System.currentTimeMillis());
@@ -128,8 +106,6 @@
buf.append(d.getTime()).append(",");
buf.append(_totalMsgCount).append(",");
buf.append(_totalMsgCount*1000 /totaltime).append(",");
- buf.append(totalMsgCountThisInterval).append(",");
- buf.append(totalMsgCountThisInterval*1000/_logDuration).append(",");
buf.append(Runtime.getRuntime().totalMemory() -Runtime.getRuntime().freeMemory()).append("\n");
buf.append("\n");
_memoryLog.write(buf.toString());
@@ -147,7 +123,7 @@
try
{
FileWriter _memoryLog = new FileWriter(_logFileName + ".csv",true);
- String s = "Date/Time,Time (ms),total msg count,total rate (msg/sec),interval count,interval rate (msg/sec),memory";
+ String s = "Date/Time,Time (ms),total msg count,total rate (msg/sec),memory";
_memoryLog.write(s);
_memoryLog.close();
}
@@ -157,33 +133,31 @@
}
}
- private void printSummary() throws Exception
+ private void printSummary()
{
- if (Boolean.getBoolean("collect_stats"))
+ try
{
- for (Integer id : _producers.keySet())
- {
- JMSProducer producer = _producers.get(id);
- _totalMsgCount = _totalMsgCount + producer.getCurrentMessageCount();
- }
+ long current = System.currentTimeMillis();
+ double time = current - _startTime;
+ double ratio = _totalMsgCount*1000/time;
+ FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true);
+
+ StringBuffer buf = new StringBuffer("MessageProducerTest \n Test started at : ");
+ buf.append(df.format(new Date(_startTime))).append("\n Test finished at : ");
+ Date d = new Date(current);
+ buf.append(df.format(d)).append("\n Total Time taken (ms):");
+ buf.append(time).append("\n Total messages sent:");
+ buf.append(_totalMsgCount).append("\n producer rate:");
+ buf.append(ratio).append("\n");
+ _summaryLog.write(buf.toString());
+ System.out.println("---------- Test Ended -------------");
+ _summaryLog.close();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
}
-
- long current = System.currentTimeMillis();
- double time = current - _startTime;
- double ratio = _totalMsgCount*1000/time;
- FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true);
-
- StringBuffer buf = new StringBuffer("MessageProducerTest \n Test started at : ");
- buf.append(df.format(new Date(_startTime))).append("\n Test finished at : ");
- Date d = new Date(current);
- buf.append(df.format(d)).append("\n Total Time taken (ms):");
- buf.append(time).append("\n Total messages sent:");
- buf.append(_totalMsgCount).append("\n producer rate:");
- buf.append(ratio).append("\n");
- _summaryLog.write(buf.toString());
- System.out.println("---------- Test Ended -------------");
- _summaryLog.close();
}
public static void main(String[] args)
@@ -191,8 +165,8 @@
try
{
MessageProducerTest test = new MessageProducerTest();
- test.start();
- test.startTimerThread();
+ test.init();
+ test.run();
}
catch(Exception e)
{
Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java?rev=599611&r1=599610&r2=599611&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java Thu Nov 29 14:46:28 2007
@@ -5,11 +5,9 @@
int _messageSize;
boolean _transacted;
boolean _synchronous;
- String[] destArray;
- int _producerCount;
- int _consumerCount;
+ String _destination;
long _expiry;
- long _logDuration;
+ long _logFrequency;
String _logFilePath;
/**
@@ -17,8 +15,6 @@
* -DmessageSize
* -DuseQueue
* -Dtransacted
- * -DproducerCount
- * -DconsumerCount
* -Ddestinations
* -DlogFilePath
* -Duration=1H,30M,10S
@@ -27,21 +23,16 @@
public void parseOptions()
{
_messageSize = Integer.parseInt(System.getProperty("messageSize","100"));
- _synchronous = Boolean.parseBoolean( System.getProperty("synchronous", "false"));
_transacted = false;
- String destinations = System.getProperty("destinations");
- destArray = destinations.split(",");
- _producerCount = Integer.parseInt(System.getProperty("producerCount","1"));
- _consumerCount = Integer.parseInt(System.getProperty("consumerCount","1"));
- _logDuration = Long.parseLong(System.getProperty("logDuration","10"));
- _logDuration = _logDuration*1000*60;
+ _destination = System.getProperty("destinations");
+ _logFrequency = Long.parseLong(System.getProperty("logDuration","10"));
_logFilePath = System.getProperty("logFilePath");
_expiry = getExpiry();
System.out.println("============= Test Data ===================");
- System.out.println("Total no of producers : " + _producerCount);
- System.out.println(_synchronous? "Total no of synchronous consumers : " : "Total no of asynchronous consumers :" + _consumerCount);
- System.out.println("Log Frequency in mins : " + _logDuration/(1000*60));
+ System.out.println("Destination : " + _destination);
+ System.out.println("Collect stats : " + Boolean.getBoolean("collect_stats"));
+ System.out.println("Log Frequency in msgs : " + _logFrequency);
System.out.println("Log file path : " + _logFilePath);
System.out.println("Test Duration : " + printTestDuration());
System.out.println("============= /Test Data ===================");