You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/01/07 14:51:08 UTC

svn commit: r609606 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/transport/tcp/ test/java/org/apache/activemq/load/

Author: rajdavies
Date: Mon Jan  7 05:51:06 2008
New Revision: 609606

URL: http://svn.apache.org/viewvc?rev=609606&view=rev
Log:
fixes for load testing

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=609606&r1=609605&r2=609606&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Mon Jan  7 05:51:06 2008
@@ -176,6 +176,7 @@
 
     public synchronized void gc() {
         for (Message msg : batchList) {
+            rollback(msg.getMessageId());
             msg.decrementReferenceCount();
         }
         cacheEnabled=false;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=609606&r1=609605&r2=609606&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Mon Jan  7 05:51:06 2008
@@ -231,6 +231,7 @@
 
     public synchronized void gc() {
         for (Message msg : batchList.values()) {
+            rollback(msg.getMessageId());
             msg.decrementReferenceCount();
         }
         batchList.clear();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=609606&r1=609605&r2=609606&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Mon Jan  7 05:51:06 2008
@@ -27,9 +27,13 @@
 import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import javax.net.ServerSocketFactory;
 
+import org.apache.activemq.ThreadPriorities;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.openwire.OpenWireFormatFactory;
 import org.apache.activemq.transport.Transport;
@@ -57,6 +61,7 @@
     protected final TcpTransportFactory transportFactory;
     protected long maxInactivityDuration = 30000;
     protected int minmumWireFormatVersion;
+   
     /**
      * trace=true -> the Transport stack where this TcpTransport
      * object will be, will have a TransportLogger layer
@@ -83,11 +88,14 @@
     protected boolean startLogging = true;
     protected Map<String, Object> transportOptions;
     protected final ServerSocketFactory serverSocketFactory;
-
+    protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
+    protected Thread socketHandlerThread;
+  
     public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
         super(location);
         this.transportFactory = transportFactory;
         this.serverSocketFactory = serverSocketFactory;
+        
     }
 
     public void bind() throws IOException {
@@ -199,18 +207,7 @@
                     if (isStopped() || getAcceptListener() == null) {
                         socket.close();
                     } else {
-                        HashMap<String, Object> options = new HashMap<String, Object>();
-                        options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
-                        options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
-                        options.put("trace", Boolean.valueOf(trace));
-                        options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
-                        options.put("startLogging", Boolean.valueOf(startLogging));
-
-                        options.putAll(transportOptions);
-                        WireFormat format = wireFormatFactory.createWireFormat();
-                        Transport transport = createTransport(socket, format);
-                        Transport configuredTransport = transportFactory.serverConfigure(transport, format, options);
-                        getAcceptListener().onAccept(configuredTransport);
+                       socketQueue.put(socket);
                     }
                 }
             } catch (SocketTimeoutException ste) {
@@ -259,6 +256,36 @@
         }
         return result;
     }
+    
+    protected void doStart() throws Exception {
+        Runnable run = new Runnable() {
+            public void run() {
+                try {
+                    while (!isStopped() && !isStopping()) {
+                        Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
+                        if (sock != null) {
+                            handleSocket(sock);
+                        }
+                    }
+
+                } catch (InterruptedException e) {
+                    LOG.info("socketQueue interuppted - stopping");
+                    if (!isStopping()) {
+                        onAcceptError(e);
+                    }
+                }
+
+            }
+
+        };
+        socketHandlerThread = new Thread(null, run,
+                "ActiveMQ Transport Server Thread Handler: " + toString(),
+                getStackSize());
+        socketHandlerThread.setDaemon(true);
+        socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
+        super.doStart();
+        socketHandlerThread.start();
+    }
 
     protected void doStop(ServiceStopper stopper) throws Exception {
         super.doStop(stopper);
@@ -274,4 +301,37 @@
     public void setTransportOption(Map<String, Object> transportOptions) {
         this.transportOptions = transportOptions;
     }
-}
+    
+    protected void handleSocket(Socket socket) {
+        try {
+            HashMap<String, Object> options = new HashMap<String, Object>();
+            options.put("maxInactivityDuration", Long
+                    .valueOf(maxInactivityDuration));
+            options.put("minmumWireFormatVersion", Integer
+                    .valueOf(minmumWireFormatVersion));
+            options.put("trace", Boolean.valueOf(trace));
+            options
+                    .put("dynamicManagement", Boolean
+                            .valueOf(dynamicManagement));
+            options.put("startLogging", Boolean.valueOf(startLogging));
+
+            options.putAll(transportOptions);
+            WireFormat format = wireFormatFactory.createWireFormat();
+            Transport transport = createTransport(socket, format);
+            Transport configuredTransport = transportFactory.serverConfigure(
+                    transport, format, options);
+            getAcceptListener().onAccept(configuredTransport);
+        } catch (SocketTimeoutException ste) {
+            // expect this to happen
+        } catch (Exception e) {
+            if (!isStopping()) {
+                onAcceptError(e);
+            } else if (!isStopped()) {
+                LOG.warn("run()", e);
+                onAcceptError(e);
+            }
+        }
+    }
+    
+    
+}
\ No newline at end of file

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java?rev=609606&r1=609605&r2=609606&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadClient.java Mon Jan  7 05:51:06 2008
@@ -16,9 +16,6 @@
  */
 package org.apache.activemq.load;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
@@ -29,12 +26,17 @@
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.perf.PerfRate;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision: 1.3 $
  */
 public class LoadClient implements Runnable{
+    private static final Log LOG = LogFactory.getLog(LoadClient.class);
+    protected static int SLEEP_TIME = 2;
     protected String name;
     protected ConnectionFactory factory;
     protected Connection connection;
@@ -45,9 +47,10 @@
     protected MessageProducer producer;
     protected PerfRate rate = new PerfRate();
     protected int deliveryMode = DeliveryMode.PERSISTENT;
-    private boolean connectionPerMessage = false;
-    private boolean running;
-    private int timeout = 10000;
+    protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
+    protected boolean connectionPerMessage = false;
+    protected boolean running;
+    protected int timeout = 10000;
     
 
     public LoadClient(String name,ConnectionFactory factory) {
@@ -65,8 +68,8 @@
                 connection = factory.createConnection();
                 connection.start();
                 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                consumer = session.createConsumer(this.startDestination);
-                producer = session.createProducer(this.nextDestination);
+                consumer = session.createConsumer(getConsumeDestination());
+                producer = session.createProducer(getSendDestination());
                 producer.setDeliveryMode(this.deliveryMode);
                 
             }
@@ -79,7 +82,9 @@
 
     public void stop() throws JMSException, InterruptedException {
         running = false;
-        connection.stop();
+        if(connection != null) {
+            connection.stop();
+        }
     }
 
     
@@ -87,34 +92,46 @@
         try {
             while (running) {
                 String result = consume();
-                if (result == null && running) {
-                    throw new Exception(name + "Failed to consume ");
+                if(result != null) {
+                    send(result);
+                    rate.increment();
+                }
+                else if (running) {
+                    LOG.error(name + " Failed to consume!");
                 }
-                send(result);
-                rate.increment();
             }
         } catch (Throwable e) {
             e.printStackTrace();
         } 
     }
     
-    protected String consume() throws JMSException {
+    protected String consume() throws Exception {
         Connection con  = null;
         MessageConsumer c = consumer;
         if (connectionPerMessage){
             con = factory.createConnection();
             con.start();
             Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            c = s.createConsumer(startDestination);
+            c = s.createConsumer(getConsumeDestination());
         }
         TextMessage result = (TextMessage) c.receive(timeout);
-        if (connectionPerMessage) {
-            con.close();
+        if (result != null) {
+            if (audit.isDuplicate(result.getJMSMessageID())) {
+                throw new JMSException("Received duplicate " + result.getText());
+            }
+            if (!audit.isInOrder(result.getJMSMessageID())) {
+                throw new JMSException("Out of order " + result.getText());
+            }
+            
+            if (connectionPerMessage) {
+                Thread.sleep(SLEEP_TIME);//give the broker a chance
+                con.close();
+            }
         }
         return result != null ? result.getText() : null;
     }
     
-    protected void send(String text) throws JMSException {
+    protected void send(String text) throws Exception {
         Connection con  = connection;
         MessageProducer p = producer;
         Session s = session;
@@ -122,13 +139,13 @@
             con = factory.createConnection();
             con.start();
             s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            p = s.createProducer(nextDestination);
+            p = s.createProducer(getSendDestination());
             p.setDeliveryMode(deliveryMode);
         }
         TextMessage message = s.createTextMessage(text);
         p.send(message);
-        //System.out.println(name + " SENT " + text + " TO " + nextDestination);
         if (connectionPerMessage) {
+            Thread.sleep(SLEEP_TIME);//give the broker a chance
             con.close();
         }
     }
@@ -203,6 +220,14 @@
 
     public void setTimeout(int timeout) {
         this.timeout = timeout;
+    }
+    
+    protected Destination getSendDestination() {
+        return nextDestination;
+    }
+    
+    protected Destination getConsumeDestination() {
+        return startDestination;
     }
 
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java?rev=609606&r1=609605&r2=609606&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadController.java Mon Jan  7 05:51:06 2008
@@ -19,157 +19,59 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.perf.PerfRate;
 
 /**
  * @version $Revision: 1.3 $
  */
-public class LoadController implements Runnable{
-    protected ConnectionFactory factory;
-    protected Connection connection;
-    protected Destination startDestination;
-    protected Destination controlDestination;
-    protected Session session;
-    protected MessageConsumer consumer;
-    protected MessageProducer producer;
-    protected PerfRate rate = new PerfRate();
-    protected int numberOfBatches = 1;
-    protected int batchSize = 1000;
-    protected int deliveryMode = DeliveryMode.PERSISTENT;
-    private boolean connectionPerMessage = false;
-    private int timeout = 5000;
-    private boolean running = false;
+public class LoadController extends LoadClient{
+    private int numberOfBatches=1;
+    private int batchSize =1000;
+    private int count;
     private final CountDownLatch stopped = new CountDownLatch(1);
-    
+     
 
-    public LoadController(ConnectionFactory factory) {
-       this.factory = factory;
+    public LoadController(String name,ConnectionFactory factory) {
+       super(name,factory);
     }
 
-   
-
-    public synchronized void start() throws JMSException {
-        if (!running) {
-            rate.reset();
-            running = true;
-            if (!connectionPerMessage) {
-                connection = factory.createConnection();
-                connection.start();
-                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                consumer = session.createConsumer(this.controlDestination);
-                producer = session.createProducer(this.startDestination);
-                producer.setDeliveryMode(this.deliveryMode);
-                
-            }
-            
-            Thread t = new  Thread(this);
-            t.setName("LoadController");
-            t.start();
-        }
+       
+    public int awaitTestComplete() throws InterruptedException {
+        boolean complete = stopped.await(60*5,TimeUnit.SECONDS);
+        return count;
     }
 
     public void stop() throws JMSException, InterruptedException {
         running = false;
-        stopped.await();
-        //stopped.await(1,TimeUnit.SECONDS);
-        connection.stop();
+        stopped.countDown();
+        if (connection != null) {
+            this.connection.stop();
+        }
     }
-
     
     public void run() {
         try {
-
             for (int i = 0; i < numberOfBatches; i++) {
                 for (int j = 0; j < batchSize; j++) {
                     String payLoad = "batch[" + i + "]no:" + j;
                     send(payLoad);
+                }
+                for (int j = 0; j < batchSize; j++) {
                     String result = consume();
-                    if (result == null || !result.equals(payLoad)) {
-                        throw new Exception("Failed to consume " + payLoad
-                                + " GOT " + result);
-                    }
-                    System.out.println("Control got " + result);
+                    if (result != null) {
+                        count++;
                     rate.increment();
+                    }
                 }
             }
-
         } catch (Throwable e) {
             e.printStackTrace();
         } finally {
             stopped.countDown();
         }
     }
-    
-    protected String consume() throws JMSException {
-        Connection con  = null;
-        MessageConsumer c = consumer;
-        if (connectionPerMessage){
-            con = factory.createConnection();
-            con.start();
-            Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            c = s.createConsumer(controlDestination);
-        }
-        TextMessage result = (TextMessage) c.receive(timeout);
-        if (connectionPerMessage) {
-            con.close();
-        }
-        return result != null ? result.getText() : null;
-    }
-    
-    protected void send(String text) throws JMSException {
-        Connection con  = null;
-        MessageProducer p = producer;
-        Session s = session;
-        if (connectionPerMessage){
-            con = factory.createConnection();
-            con.start();
-            s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            p = s.createProducer(startDestination);
-            p.setDeliveryMode(deliveryMode);
-        }
-        TextMessage message = s.createTextMessage(text);
-        p.send(message);
-        if (connectionPerMessage) {
-            con.close();
-        }
-    }
-
-
-
-    public Destination getStartDestination() {
-        return startDestination;
-    }
-
-
-
-    public void setStartDestination(Destination startDestination) {
-        this.startDestination = startDestination;
-    }
-
-
-
-    public Destination getControlDestination() {
-        return controlDestination;
-    }
-
-
-
-    public void setControlDestination(Destination controlDestination) {
-        this.controlDestination = controlDestination;
-    }
-
 
 
     public int getNumberOfBatches() {
@@ -177,57 +79,27 @@
     }
 
 
-
     public void setNumberOfBatches(int numberOfBatches) {
         this.numberOfBatches = numberOfBatches;
     }
 
 
-
     public int getBatchSize() {
         return batchSize;
     }
 
 
-
     public void setBatchSize(int batchSize) {
         this.batchSize = batchSize;
     }
-
-
-
-    public int getDeliveryMode() {
-        return deliveryMode;
-    }
-
-
-
-    public void setDeliveryMode(int deliveryMode) {
-        this.deliveryMode = deliveryMode;
-    }
-
-
-
-    public boolean isConnectionPerMessage() {
-        return connectionPerMessage;
-    }
-
-
-
-    public void setConnectionPerMessage(boolean connectionPerMessage) {
-        this.connectionPerMessage = connectionPerMessage;
-    }
-
-
-
-    public int getTimeout() {
-        return timeout;
+    
+    protected Destination getSendDestination() {
+        return startDestination;
     }
-
-
-
-    public void setTimeout(int timeout) {
-        this.timeout = timeout;
+    
+    protected Destination getConsumeDestination() {
+        return nextDestination;
     }
-
+    
+    
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java?rev=609606&r1=609605&r2=609606&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/load/LoadTest.java Mon Jan  7 05:51:06 2008
@@ -42,12 +42,12 @@
     protected LoadClient[] clients;
     protected ConnectionFactory factory;
     protected Destination destination;
-    protected int numberOfClients = 10;
+    protected int numberOfClients = 50;
     protected int deliveryMode = DeliveryMode.PERSISTENT;
     protected int batchSize = 1000;
-    protected int numberOfBatches = 4;
+    protected int numberOfBatches = 10;
     protected int timeout = Integer.MAX_VALUE;
-    protected boolean connectionPerMessage = true;
+    protected boolean connectionPerMessage = false;
     protected Connection managementConnection;
     protected Session managementSession;
 
@@ -66,14 +66,15 @@
         
         Destination startDestination = createDestination(managementSession, getClass()+".start");
         Destination endDestination = createDestination(managementSession, getClass()+".end");
-        LOG.info("Running with " + numberOfClients + " clients");
-        controller = new LoadController(factory);
+        LOG.info("Running with " + numberOfClients + " clients - sending "
+                + numberOfBatches + " batches of " + batchSize + " messages");
+        controller = new LoadController("Controller",factory);
         controller.setBatchSize(batchSize);
         controller.setNumberOfBatches(numberOfBatches);
         controller.setDeliveryMode(deliveryMode);
         controller.setConnectionPerMessage(connectionPerMessage);
         controller.setStartDestination(startDestination);
-        controller.setControlDestination(endDestination);
+        controller.setNextDestination(endDestination);
         controller.setTimeout(timeout);
         clients = new LoadClient[numberOfClients];
         for (int i = 0; i < numberOfClients; i++) {
@@ -147,7 +148,7 @@
             clients[i].start();
         }
         controller.start();
-        controller.stop();
+        assertEquals((batchSize* numberOfBatches),controller.awaitTestComplete());
         
     }