You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/11/14 10:38:06 UTC
svn commit: r594814 - in
/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf:
JMSAsyncConsumer.java JMSConsumer.java JMSSyncConsumer.java
MessageConsumerTest.java Options.java
Author: arnaudsimon
Date: Wed Nov 14 01:38:05 2007
New Revision: 594814
URL: http://svn.apache.org/viewvc?rev=594814&view=rev
Log:
Added distinction between sync and async consumers
Added:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java
- copied, changed from r594797, incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java
Modified:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java
Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java?rev=594814&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java Wed Nov 14 01:38:05 2007
@@ -0,0 +1,91 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client.perf;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ *
+ */
+public class JMSAsyncConsumer implements MessageListener, JMSConsumer
+{
+ private static final Logger _logger = LoggerFactory.getLogger(JMSSyncConsumer.class);
+
+ private String _id;
+ private Connection _connection;
+ private Session _session;
+ private MessageConsumer _consumer;
+ private Destination _destination;
+ private boolean _transacted;
+ private int _ackMode = Session.AUTO_ACKNOWLEDGE;
+ private AtomicBoolean _run = new AtomicBoolean(true);
+ private long _currentMsgCount;
+
+ /* Not implementing transactions for first phase */
+ public JMSAsyncConsumer(String id,Connection connection, Destination destination,boolean transacted,int ackMode) throws Exception
+ {
+ _id = id;
+ _connection = connection;
+ _destination = destination;
+ _transacted = transacted;
+ _ackMode = ackMode;
+ _session = _connection.createSession(_transacted, _ackMode);
+ _consumer = _session.createConsumer(_destination);
+ _consumer.setMessageListener(this);
+ }
+
+
+
+ public void onMessage(Message message)
+ {
+ _currentMsgCount ++;
+ }
+
+ public void stopConsuming()
+ {
+ System.out.println("Producer received notification to stop");
+ try
+ {
+ _session.close();
+ _connection.close();
+ }
+ catch(Exception e)
+ {
+ _logger.error("Error Closing JMSSyncConsumer:"+ _id, e);
+ }
+ }
+
+ public String getId()
+ {
+ return _id;
+ }
+
+ /* Not worried about synchronizing as accuracy here is not that important.
+ * So if this method is invoked the count maybe off by a few digits.
+ * But when the test stops, this will always return the proper count.
+ */
+ public long getCurrentMessageCount()
+ {
+ return _currentMsgCount;
+ }
+}
Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java?rev=594814&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java Wed Nov 14 01:38:05 2007
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client.perf;
+
+/**
+ *
+ *
+ */
+public interface JMSConsumer
+{
+ public String getId();
+ public void stopConsuming();
+ public long getCurrentMessageCount();
+}
Copied: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java (from r594797, incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java?p2=incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java&p1=incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java&r1=594797&r2=594814&rev=594814&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java Wed Nov 14 01:38:05 2007
@@ -11,9 +11,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class JMSConsumer implements Runnable
+public class JMSSyncConsumer implements Runnable, JMSConsumer
{
- private static final Logger _logger = LoggerFactory.getLogger(JMSConsumer.class);
+ private static final Logger _logger = LoggerFactory.getLogger(JMSSyncConsumer.class);
private String _id;
private Connection _connection;
@@ -26,7 +26,7 @@
private long _currentMsgCount;
/* Not implementing transactions for first phase */
- public JMSConsumer(String id,Connection connection, Destination destination,boolean transacted,int ackMode) throws Exception
+ public JMSSyncConsumer(String id,Connection connection, Destination destination,boolean transacted,int ackMode) throws Exception
{
_id = id;
_connection = connection;
@@ -56,17 +56,17 @@
BytesMessage msg = (BytesMessage)_consumer.receive();
if (msg != null)
{
- long msgId = Integer.parseInt(msg.getJMSCorrelationID());
+ // long msgId = Integer.parseInt(msg.getJMSCorrelationID());
/*if (_currentMsgCount+1 != msgId)
{
- _logger.error("Error : Message received out of order in JMSConsumer:" + _id + " message id was " + msgId + " expected: " + _currentMsgCount+1);
+ _logger.error("Error : Message received out of order in JMSSyncConsumer:" + _id + " message id was " + msgId + " expected: " + _currentMsgCount+1);
}*/
_currentMsgCount ++;
}
}
catch(Exception e)
{
- _logger.error("Error Receiving message from JMSConsumer:" + _id, e);
+ _logger.error("Error Receiving message from JMSSyncConsumer:" + _id, e);
}
}
try
@@ -76,7 +76,7 @@
}
catch(Exception e)
{
- _logger.error("Error Closing JMSConsumer:"+ _id, e);
+ _logger.error("Error Closing JMSSyncConsumer:"+ _id, e);
}
}
Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java?rev=594814&r1=594813&r2=594814&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java Wed Nov 14 01:38:05 2007
@@ -1,13 +1,11 @@
package org.apache.qpid.client.perf;
import java.io.FileWriter;
-import java.io.RandomAccessFile;
import java.sql.Date;
import java.text.SimpleDateFormat;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Session;
@@ -21,7 +19,7 @@
private static final Logger _logger = LoggerFactory.getLogger(MessageConsumerTest.class);
private SimpleDateFormat df = new SimpleDateFormat("h:mm a");
- private Map<Integer,JMSConsumer> _consumers = new ConcurrentHashMap<Integer,JMSConsumer>();
+ private Map<Integer, JMSConsumer> _consumers = new ConcurrentHashMap<Integer, JMSConsumer>();
private int _count;
String _logFileName;
private long _gracePeriod = 5 * 60 * 1000;
@@ -30,30 +28,38 @@
public void start() throws Exception
{
- this.parseOptions();
- boolean useSameDest = true;
- _logFileName = _logFilePath + "/MessageConsumerTest_" + System.currentTimeMillis();
-
- // use each destination with a different producer
- if (_producerCount == destArray.length)
- {
- useSameDest = false;
- }
- for (;_count < _producerCount;_count++)
- {
- createAndStartProducer(useSameDest?destArray[0]:destArray[_count]);
- }
+ this.parseOptions();
+ boolean useSameDest = true;
+ _logFileName = _logFilePath + "/MessageConsumerTest_" + System.currentTimeMillis();
+
+ // use each destination with a different producer
+ if (_producerCount == destArray.length)
+ {
+ useSameDest = false;
+ }
+ for (; _count < _producerCount; _count++)
+ {
+ createAndStartConsumer(useSameDest ? destArray[0] : destArray[_count]);
+ }
}
- private void createAndStartProducer(String routingKey)throws Exception
+ private void createAndStartConsumer(String routingKey) throws Exception
{
AMQConnection con = ConnectionUtility.getInstance().getConnection();
con.start();
- Destination dest = new AMQTopic(con,routingKey);
- JMSConsumer prod = new JMSConsumer(String.valueOf(_count),(Connection)con, dest, _transacted,Session.AUTO_ACKNOWLEDGE);
- Thread t = new Thread(prod);
- t.setName("JMSConsumer-"+_count);
- t.start();
+ Destination dest = new AMQTopic(con, routingKey);
+ JMSConsumer prod;
+ if (_synchronous)
+ {
+ prod = new JMSSyncConsumer(String.valueOf(_count), con, dest, _transacted, Session.AUTO_ACKNOWLEDGE);
+ Thread t = new Thread((JMSSyncConsumer) prod);
+ t.setName("JMSSyncConsumer-" + _count);
+ t.start();
+ }
+ else
+ {
+ prod = new JMSAsyncConsumer(String.valueOf(_count), con, dest, _transacted, Session.AUTO_ACKNOWLEDGE);
+ }
_consumers.put(_count, prod);
}
@@ -71,12 +77,12 @@
runReaper(false);
try
{
- while(run)
+ while (run)
{
Thread.sleep(_logDuration);
runReaper(false);
- if(System.currentTimeMillis() + _gracePeriod - _startTime > _expiry )
+ if (System.currentTimeMillis() + _gracePeriod - _startTime > _expiry)
{
// time to stop the test.
for (Integer id : _consumers.keySet())
@@ -91,7 +97,7 @@
}
catch (InterruptedException e)
{
- _logger.error("The timer thread exited",e);
+ _logger.error("The timer thread exited", e);
}
}
@@ -99,11 +105,11 @@
{
try
{
- FileWriter _logFile = new FileWriter(_logFileName + ".csv",true);
+ FileWriter _logFile = new FileWriter(_logFileName + ".csv", true);
for (Integer id : _consumers.keySet())
{
JMSConsumer prod = _consumers.get(id);
- StringBuffer buf = new StringBuffer("JMSConsumer(").append(prod.getId()).append("),");
+ StringBuffer buf = new StringBuffer("JMSSyncConsumer(").append(prod.getId()).append("),");
Date d = new Date(System.currentTimeMillis());
buf.append(df.format(d)).append(",");
buf.append(d.getTime()).append(",");
@@ -113,21 +119,21 @@
}
_logFile.close();
- FileWriter _memoryLog = new FileWriter(_logFileName + "_memory.csv",true);
- StringBuffer buf = new StringBuffer("JMSConsumer,");
+ FileWriter _memoryLog = new FileWriter(_logFileName + "_memory.csv", true);
+ StringBuffer buf = new StringBuffer("JMSSyncConsumer,");
Date d = new Date(System.currentTimeMillis());
buf.append(df.format(d)).append(",");
buf.append(d.getTime()).append(",");
buf.append(totalMsgCount).append(",");
- buf.append(Runtime.getRuntime().totalMemory() -Runtime.getRuntime().freeMemory()).append("\n");
+ buf.append(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()).append("\n");
_memoryLog.write(buf.toString());
_memoryLog.close();
if (printSummary)
{
double totaltime = d.getTime() - _startTime;
double dCount = totalMsgCount;
- double ratio = (dCount/totaltime)*1000;
- FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true);
+ double ratio = (dCount / totaltime) * 1000;
+ FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary", true);
buf = new StringBuffer("MessageConsumerTest \n Test started at : ");
buf.append(df.format(new Date(_startTime))).append("\n Test finished at : ");
d = new Date(System.currentTimeMillis());
@@ -140,9 +146,9 @@
_summaryLog.close();
}
}
- catch(Exception e)
+ catch (Exception e)
{
- _logger.error("Error printing info to the log file",e);
+ _logger.error("Error printing info to the log file", e);
}
}
@@ -154,7 +160,7 @@
test.start();
test.startTimerThread();
}
- catch(Exception e)
+ catch (Exception e)
{
e.printStackTrace();
}
Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java?rev=594814&r1=594813&r2=594814&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java Wed Nov 14 01:38:05 2007
@@ -4,6 +4,7 @@
{
int _messageSize;
boolean _transacted;
+ boolean _synchronous;
String[] destArray;
int _producerCount;
int _consumerCount;
@@ -25,6 +26,7 @@
public void parseOptions()
{
_messageSize = Integer.parseInt(System.getProperty("messageSize","100"));
+ _synchronous =Boolean.parseBoolean( System.getProperty("synchronous", "false"));
_transacted = false;
String destinations = System.getProperty("destinations");
destArray = destinations.split(",");
@@ -37,7 +39,7 @@
System.out.println("============= Test Data ===================");
System.out.println("Total no of producers : " + _producerCount);
- System.out.println("Total no of consumer : " + _consumerCount);
+ System.out.println(_synchronous? "Total no of synchronous consumers : " : "Total no of asynchronous consumers :" + _consumerCount);
System.out.println("Log Frequency in mins : " + _logDuration/(1000*60));
System.out.println("Log file path : " + _logFilePath);
System.out.println("Test Duration : " + printTestDuration());