You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/01/07 14:51:08 UTC
svn commit: r609606 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/cursors/
main/java/org/apache/activemq/transport/tcp/
test/java/org/apache/activemq/load/
Author: rajdavies
Date: Mon Jan 7 05:51:06 2008
New Revision: 609606
URL: http://svn.apache.org/viewvc?rev=609606&view=rev
Log:
fixes for load testing
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=609606&r1=609605&r2=609606&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Mon Jan 7 05:51:06 2008
@@ -176,6 +176,7 @@
public synchronized void gc() {
for (Message msg : batchList) {
+ rollback(msg.getMessageId());
msg.decrementReferenceCount();
}
cacheEnabled=false;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=609606&r1=609605&r2=609606&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Mon Jan 7 05:51:06 2008
@@ -231,6 +231,7 @@
public synchronized void gc() {
for (Message msg : batchList.values()) {
+ rollback(msg.getMessageId());
msg.decrementReferenceCount();
}
batchList.clear();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=609606&r1=609605&r2=609606&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Mon Jan 7 05:51:06 2008
@@ -27,9 +27,13 @@
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import javax.net.ServerSocketFactory;
+import org.apache.activemq.ThreadPriorities;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.Transport;
@@ -57,6 +61,7 @@
protected final TcpTransportFactory transportFactory;
protected long maxInactivityDuration = 30000;
protected int minmumWireFormatVersion;
+
/**
* trace=true -> the Transport stack where this TcpTransport
* object will be, will have a TransportLogger layer
@@ -83,11 +88,14 @@
protected boolean startLogging = true;
protected Map<String, Object> transportOptions;
protected final ServerSocketFactory serverSocketFactory;
-
+ protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
+ protected Thread socketHandlerThread;
+
public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
super(location);
this.transportFactory = transportFactory;
this.serverSocketFactory = serverSocketFactory;
+
}
public void bind() throws IOException {
@@ -199,18 +207,7 @@
if (isStopped() || getAcceptListener() == null) {
socket.close();
} else {
- HashMap<String, Object> options = new HashMap<String, Object>();
- options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
- options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
- options.put("trace", Boolean.valueOf(trace));
- options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
- options.put("startLogging", Boolean.valueOf(startLogging));
-
- options.putAll(transportOptions);
- WireFormat format = wireFormatFactory.createWireFormat();
- Transport transport = createTransport(socket, format);
- Transport configuredTransport = transportFactory.serverConfigure(transport, format, options);
- getAcceptListener().onAccept(configuredTransport);
+ socketQueue.put(socket);
}
}
} catch (SocketTimeoutException ste) {
@@ -259,6 +256,36 @@
}
return result;
}
+
+ protected void doStart() throws Exception {
+ Runnable run = new Runnable() {
+ public void run() {
+ try {
+ while (!isStopped() && !isStopping()) {
+ Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
+ if (sock != null) {
+ handleSocket(sock);
+ }
+ }
+
+ } catch (InterruptedException e) {
+ LOG.info("socketQueue interuppted - stopping");
+ if (!isStopping()) {
+ onAcceptError(e);
+ }
+ }
+
+ }
+
+ };
+ socketHandlerThread = new Thread(null, run,
+ "ActiveMQ Transport Server Thread Handler: " + toString(),
+ getStackSize());
+ socketHandlerThread.setDaemon(true);
+ socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
+ super.doStart();
+ socketHandlerThread.start();
+ }
protected void doStop(ServiceStopper stopper) throws Exception {
super.doStop(stopper);
@@ -274,4 +301,37 @@
public void setTransportOption(Map<String, Object> transportOptions) {
this.transportOptions = transportOptions;
}
-}
+
+ protected void handleSocket(Socket socket) {
+ try {
+ HashMap<String, Object> options = new HashMap<String, Object>();
+ options.put("maxInactivityDuration", Long
+ .valueOf(maxInactivityDuration));
+ options.put("minmumWireFormatVersion", Integer
+ .valueOf(minmumWireFormatVersion));
+ options.put("trace", Boolean.valueOf(trace));
+ options
+ .put("dynamicManagement", Boolean
+ .valueOf(dynamicManagement));
+ options.put("startLogging", Boolean.valueOf(startLogging));
+
+ options.putAll(transportOptions);
+ WireFormat format = wireFormatFactory.createWireFormat();
+ Transport transport = createTransport(socket, format);
+ Transport configuredTransport = transportFactory.serverConfigure(
+ transport, format, options);
+ getAcceptListener().onAccept(configuredTransport);
+ } catch (SocketTimeoutException ste) {
+ // expect this to happen
+ } catch (Exception e) {
+ if (!isStopping()) {
+ onAcceptError(e);
+ } else if (!isStopped()) {
+ LOG.warn("run()", e);
+ onAcceptError(e);
+ }
+ }
+ }
+
+
+}
\ No newline at end of file
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java?rev=609606&r1=609605&r2=609606&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java Mon Jan 7 05:51:06 2008
@@ -16,9 +16,6 @@
*/
package org.apache.activemq.load;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
@@ -29,12 +26,17 @@
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.perf.PerfRate;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.3 $
*/
public class LoadClient implements Runnable{
+ private static final Log LOG = LogFactory.getLog(LoadClient.class);
+ protected static int SLEEP_TIME = 2;
protected String name;
protected ConnectionFactory factory;
protected Connection connection;
@@ -45,9 +47,10 @@
protected MessageProducer producer;
protected PerfRate rate = new PerfRate();
protected int deliveryMode = DeliveryMode.PERSISTENT;
- private boolean connectionPerMessage = false;
- private boolean running;
- private int timeout = 10000;
+ protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
+ protected boolean connectionPerMessage = false;
+ protected boolean running;
+ protected int timeout = 10000;
public LoadClient(String name,ConnectionFactory factory) {
@@ -65,8 +68,8 @@
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = session.createConsumer(this.startDestination);
- producer = session.createProducer(this.nextDestination);
+ consumer = session.createConsumer(getConsumeDestination());
+ producer = session.createProducer(getSendDestination());
producer.setDeliveryMode(this.deliveryMode);
}
@@ -79,7 +82,9 @@
public void stop() throws JMSException, InterruptedException {
running = false;
- connection.stop();
+ if(connection != null) {
+ connection.stop();
+ }
}
@@ -87,34 +92,46 @@
try {
while (running) {
String result = consume();
- if (result == null && running) {
- throw new Exception(name + "Failed to consume ");
+ if(result != null) {
+ send(result);
+ rate.increment();
+ }
+ else if (running) {
+ LOG.error(name + " Failed to consume!");
}
- send(result);
- rate.increment();
}
} catch (Throwable e) {
e.printStackTrace();
}
}
- protected String consume() throws JMSException {
+ protected String consume() throws Exception {
Connection con = null;
MessageConsumer c = consumer;
if (connectionPerMessage){
con = factory.createConnection();
con.start();
Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- c = s.createConsumer(startDestination);
+ c = s.createConsumer(getConsumeDestination());
}
TextMessage result = (TextMessage) c.receive(timeout);
- if (connectionPerMessage) {
- con.close();
+ if (result != null) {
+ if (audit.isDuplicate(result.getJMSMessageID())) {
+ throw new JMSException("Received duplicate " + result.getText());
+ }
+ if (!audit.isInOrder(result.getJMSMessageID())) {
+ throw new JMSException("Out of order " + result.getText());
+ }
+
+ if (connectionPerMessage) {
+ Thread.sleep(SLEEP_TIME);//give the broker a chance
+ con.close();
+ }
}
return result != null ? result.getText() : null;
}
- protected void send(String text) throws JMSException {
+ protected void send(String text) throws Exception {
Connection con = connection;
MessageProducer p = producer;
Session s = session;
@@ -122,13 +139,13 @@
con = factory.createConnection();
con.start();
s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- p = s.createProducer(nextDestination);
+ p = s.createProducer(getSendDestination());
p.setDeliveryMode(deliveryMode);
}
TextMessage message = s.createTextMessage(text);
p.send(message);
- //System.out.println(name + " SENT " + text + " TO " + nextDestination);
if (connectionPerMessage) {
+ Thread.sleep(SLEEP_TIME);//give the broker a chance
con.close();
}
}
@@ -203,6 +220,14 @@
public void setTimeout(int timeout) {
this.timeout = timeout;
+ }
+
+ protected Destination getSendDestination() {
+ return nextDestination;
+ }
+
+ protected Destination getConsumeDestination() {
+ return startDestination;
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java?rev=609606&r1=609605&r2=609606&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java Mon Jan 7 05:51:06 2008
@@ -19,157 +19,59 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.perf.PerfRate;
/**
* @version $Revision: 1.3 $
*/
-public class LoadController implements Runnable{
- protected ConnectionFactory factory;
- protected Connection connection;
- protected Destination startDestination;
- protected Destination controlDestination;
- protected Session session;
- protected MessageConsumer consumer;
- protected MessageProducer producer;
- protected PerfRate rate = new PerfRate();
- protected int numberOfBatches = 1;
- protected int batchSize = 1000;
- protected int deliveryMode = DeliveryMode.PERSISTENT;
- private boolean connectionPerMessage = false;
- private int timeout = 5000;
- private boolean running = false;
+public class LoadController extends LoadClient{
+ private int numberOfBatches=1;
+ private int batchSize =1000;
+ private int count;
private final CountDownLatch stopped = new CountDownLatch(1);
-
+
- public LoadController(ConnectionFactory factory) {
- this.factory = factory;
+ public LoadController(String name,ConnectionFactory factory) {
+ super(name,factory);
}
-
-
- public synchronized void start() throws JMSException {
- if (!running) {
- rate.reset();
- running = true;
- if (!connectionPerMessage) {
- connection = factory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = session.createConsumer(this.controlDestination);
- producer = session.createProducer(this.startDestination);
- producer.setDeliveryMode(this.deliveryMode);
-
- }
-
- Thread t = new Thread(this);
- t.setName("LoadController");
- t.start();
- }
+
+ public int awaitTestComplete() throws InterruptedException {
+ boolean complete = stopped.await(60*5,TimeUnit.SECONDS);
+ return count;
}
public void stop() throws JMSException, InterruptedException {
running = false;
- stopped.await();
- //stopped.await(1,TimeUnit.SECONDS);
- connection.stop();
+ stopped.countDown();
+ if (connection != null) {
+ this.connection.stop();
+ }
}
-
public void run() {
try {
-
for (int i = 0; i < numberOfBatches; i++) {
for (int j = 0; j < batchSize; j++) {
String payLoad = "batch[" + i + "]no:" + j;
send(payLoad);
+ }
+ for (int j = 0; j < batchSize; j++) {
String result = consume();
- if (result == null || !result.equals(payLoad)) {
- throw new Exception("Failed to consume " + payLoad
- + " GOT " + result);
- }
- System.out.println("Control got " + result);
+ if (result != null) {
+ count++;
rate.increment();
+ }
}
}
-
} catch (Throwable e) {
e.printStackTrace();
} finally {
stopped.countDown();
}
}
-
- protected String consume() throws JMSException {
- Connection con = null;
- MessageConsumer c = consumer;
- if (connectionPerMessage){
- con = factory.createConnection();
- con.start();
- Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- c = s.createConsumer(controlDestination);
- }
- TextMessage result = (TextMessage) c.receive(timeout);
- if (connectionPerMessage) {
- con.close();
- }
- return result != null ? result.getText() : null;
- }
-
- protected void send(String text) throws JMSException {
- Connection con = null;
- MessageProducer p = producer;
- Session s = session;
- if (connectionPerMessage){
- con = factory.createConnection();
- con.start();
- s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- p = s.createProducer(startDestination);
- p.setDeliveryMode(deliveryMode);
- }
- TextMessage message = s.createTextMessage(text);
- p.send(message);
- if (connectionPerMessage) {
- con.close();
- }
- }
-
-
-
- public Destination getStartDestination() {
- return startDestination;
- }
-
-
-
- public void setStartDestination(Destination startDestination) {
- this.startDestination = startDestination;
- }
-
-
-
- public Destination getControlDestination() {
- return controlDestination;
- }
-
-
-
- public void setControlDestination(Destination controlDestination) {
- this.controlDestination = controlDestination;
- }
-
public int getNumberOfBatches() {
@@ -177,57 +79,27 @@
}
-
public void setNumberOfBatches(int numberOfBatches) {
this.numberOfBatches = numberOfBatches;
}
-
public int getBatchSize() {
return batchSize;
}
-
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
-
-
-
- public int getDeliveryMode() {
- return deliveryMode;
- }
-
-
-
- public void setDeliveryMode(int deliveryMode) {
- this.deliveryMode = deliveryMode;
- }
-
-
-
- public boolean isConnectionPerMessage() {
- return connectionPerMessage;
- }
-
-
-
- public void setConnectionPerMessage(boolean connectionPerMessage) {
- this.connectionPerMessage = connectionPerMessage;
- }
-
-
-
- public int getTimeout() {
- return timeout;
+
+ protected Destination getSendDestination() {
+ return startDestination;
}
-
-
-
- public void setTimeout(int timeout) {
- this.timeout = timeout;
+
+ protected Destination getConsumeDestination() {
+ return nextDestination;
}
-
+
+
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java?rev=609606&r1=609605&r2=609606&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java Mon Jan 7 05:51:06 2008
@@ -42,12 +42,12 @@
protected LoadClient[] clients;
protected ConnectionFactory factory;
protected Destination destination;
- protected int numberOfClients = 10;
+ protected int numberOfClients = 50;
protected int deliveryMode = DeliveryMode.PERSISTENT;
protected int batchSize = 1000;
- protected int numberOfBatches = 4;
+ protected int numberOfBatches = 10;
protected int timeout = Integer.MAX_VALUE;
- protected boolean connectionPerMessage = true;
+ protected boolean connectionPerMessage = false;
protected Connection managementConnection;
protected Session managementSession;
@@ -66,14 +66,15 @@
Destination startDestination = createDestination(managementSession, getClass()+".start");
Destination endDestination = createDestination(managementSession, getClass()+".end");
- LOG.info("Running with " + numberOfClients + " clients");
- controller = new LoadController(factory);
+ LOG.info("Running with " + numberOfClients + " clients - sending "
+ + numberOfBatches + " batches of " + batchSize + " messages");
+ controller = new LoadController("Controller",factory);
controller.setBatchSize(batchSize);
controller.setNumberOfBatches(numberOfBatches);
controller.setDeliveryMode(deliveryMode);
controller.setConnectionPerMessage(connectionPerMessage);
controller.setStartDestination(startDestination);
- controller.setControlDestination(endDestination);
+ controller.setNextDestination(endDestination);
controller.setTimeout(timeout);
clients = new LoadClient[numberOfClients];
for (int i = 0; i < numberOfClients; i++) {
@@ -147,7 +148,7 @@
clients[i].start();
}
controller.start();
- controller.stop();
+ assertEquals((batchSize* numberOfBatches),controller.awaitTestComplete());
}