You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/04/05 10:51:55 UTC

svn commit: r525766 - in /incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid: server/failure/ server/failure/HeapExhaustion.java testutil/ testutil/QpidClientConnection.java

Author: ritchiem
Date: Thu Apr  5 01:51:55 2007
New Revision: 525766

URL: http://svn.apache.org/viewvc?view=rev&rev=525766
Log:
QPID-308 Added test case to demonstrate heap exhaustion of broker. Can't be run InVM as it kills the broker.

Added:
    incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/failure/
    incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java   (with props)
    incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/testutil/
    incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java   (with props)

Added: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java?view=auto&rev=525766
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java (added)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java Thu Apr  5 01:51:55 2007
@@ -0,0 +1,87 @@
+package org.apache.qpid.server.failure;
+
+import junit.framework.TestCase;
+import org.apache.qpid.testutil.QpidClientConnection;
+
+
+/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */
+public class HeapExhaustion extends TestCase
+{
+    protected QpidClientConnection conn;
+    protected final String BROKER = "localhost";
+    protected final String vhost = "/test";
+    protected final String queue = "direct://amq.direct//queue";
+
+    protected String hundredK;
+    protected String megabyte;
+
+    protected void log(String msg)
+    {
+        System.out.println(msg);
+    }
+
+    protected String generatePayloadOfSize(Integer numBytes)
+    {
+        return new String(new byte[numBytes]);
+    }
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        conn = new QpidClientConnection(BROKER);
+        conn.setVirtualHost(vhost);
+
+        conn.connect();
+        // clear queue
+        log("setup: clearing test queue");
+        conn.consume(queue, 2000);
+
+        hundredK = generatePayloadOfSize(1024 * 100);
+        megabyte = generatePayloadOfSize(1024 * 1024);
+    }
+
+    @Override
+    protected void tearDown() throws Exception
+    {
+        super.tearDown();
+        conn.disconnect();
+    }
+
+
+    /** PUT at maximum rate (although we commit after each PUT) until failure
+     * @throws Exception on error
+     */
+    public void testUntilFailure() throws Exception
+    {
+        int copies = 0;
+        int total = 0;
+        String payload = hundredK;
+        int size = payload.getBytes().length;
+        while (true)
+        {
+            conn.put(queue, payload, 1);
+            copies++;
+            total += size;
+            log("put copy " + copies + " OK for total bytes: " + total);
+        }
+    }
+
+    /** PUT at lower rate (5 per second) until failure
+     * @throws Exception on error 
+     */
+    public void testUntilFailureWithDelays() throws Exception
+    {
+        int copies = 0;
+        int total = 0;
+        String payload = hundredK;
+        int size = payload.getBytes().length;
+        while (true)
+        {
+            conn.put(queue, payload, 1);
+            copies++;
+            total += size;
+            log("put copy " + copies + " OK for total bytes: " + total);
+            Thread.sleep(200);
+        }
+    }
+}

Propchange: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java?view=auto&rev=525766
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java (added)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java Thu Apr  5 01:51:55 2007
@@ -0,0 +1,267 @@
+package org.apache.qpid.testutil;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Logger;
+
+import javax.jms.ExceptionListener;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+public class QpidClientConnection implements ExceptionListener
+{
+
+    private static final Logger _logger = Logger.getLogger(QpidClientConnection.class);
+
+    private boolean transacted = true;
+    private int ackMode = Session.CLIENT_ACKNOWLEDGE;
+    private Connection connection;
+
+    private String virtualHost;
+    private String brokerlist;
+    private int prefetch;
+    protected Session session;
+    protected boolean connected;
+
+    public QpidClientConnection(String broker)
+    {
+        super();
+        setVirtualHost("/test");
+        setBrokerList(broker);
+        setPrefetch(5000);
+    }
+
+
+    public void connect() throws JMSException
+    {
+        if (!connected)
+        {
+            /*
+            * amqp://[user:pass@][clientid]/virtualhost?
+            * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
+            * [&failover='method[?option='value'[&option='value']]']
+            * [&option='value']"
+            */
+            String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+            try
+            {
+                AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
+                _logger.info("connecting to Qpid :" + brokerUrl);
+                connection = factory.createConnection();
+
+                // register exception listener
+                connection.setExceptionListener(this);
+
+                session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
+
+
+                _logger.info("starting connection");
+                connection.start();
+
+                connected = true;
+            }
+            catch (URLSyntaxException e)
+            {
+                throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
+            }
+        }
+    }
+
+    public void disconnect() throws JMSException
+    {
+        if (connected)
+        {
+            session.commit();
+            session.close();
+            connection.close();
+            connected = false;
+            _logger.info("disconnected");
+        }
+    }
+
+    public void disconnectWithoutCommit() throws JMSException
+    {
+        if (connected)
+        {
+            session.close();
+            connection.close();
+            connected = false;
+            _logger.info("disconnected without commit");
+        }
+    }
+
+    public String getBrokerList()
+    {
+        return brokerlist;
+    }
+
+    public void setBrokerList(String brokerlist)
+    {
+        this.brokerlist = brokerlist;
+    }
+
+    public String getVirtualHost()
+    {
+        return virtualHost;
+    }
+
+    public void setVirtualHost(String virtualHost)
+    {
+        this.virtualHost = virtualHost;
+    }
+
+    public void setPrefetch(int prefetch)
+    {
+        this.prefetch = prefetch;
+    }
+
+
+    /** override as necessary */
+    public void onException(JMSException exception)
+    {
+        _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
+    }
+
+    public boolean isConnected()
+    {
+        return connected;
+    }
+
+    public Session getSession()
+    {
+        return session;
+    }
+
+    /**
+     * Put a String as a text messages, repeat n times. A null payload will result in a null message.
+     *
+     * @param queueName The queue name to put to
+     * @param payload   the content of the payload
+     * @param copies    the number of messages to put
+     *
+     * @throws javax.jms.JMSException any exception that occurs
+     */
+    public void put(String queueName, String payload, int copies) throws JMSException
+    {
+        if (!connected)
+        {
+            connect();
+        }
+
+        _logger.info("putting to queue " + queueName);
+        Queue queue = session.createQueue(queueName);
+
+        final MessageProducer sender = session.createProducer(queue);
+
+        for (int i = 0; i < copies; i++)
+        {
+            Message m = session.createTextMessage(payload + i);
+            m.setIntProperty("index", i + 1);
+            sender.send(m);
+        }
+
+        session.commit();
+        sender.close();
+        _logger.info("put " + copies + " copies");
+    }
+
+    /**
+     * GET the top message on a queue. Consumes the message. Accepts timeout value.
+     *
+     * @param queueName   The quename to get from
+     * @param readTimeout The timeout to use
+     *
+     * @return the content of the text message if any
+     *
+     * @throws javax.jms.JMSException any exception that occured
+     */
+    public Message getNextMessage(String queueName, long readTimeout) throws JMSException
+    {
+        if (!connected)
+        {
+            connect();
+        }
+
+        Queue queue = session.createQueue(queueName);
+
+        final MessageConsumer consumer = session.createConsumer(queue);
+
+        Message message = consumer.receive(readTimeout);
+        session.commit();
+        consumer.close();
+
+        Message result;
+
+        // all messages we consume should be TextMessages
+        if (message instanceof TextMessage)
+        {
+            result = ((TextMessage) message);
+        }
+        else if (null == message)
+        {
+            result = null;
+        }
+        else
+        {
+            _logger.info("warning: received non-text message");
+            result = message;
+        }
+
+        return result;
+    }
+
+    /**
+     * GET the top message on a queue. Consumes the message.
+     *
+     * @param queueName The Queuename to get from
+     *
+     * @return The string content of the text message, if any received
+     *
+     * @throws javax.jms.JMSException any exception that occurs
+     */
+    public Message getNextMessage(String queueName) throws JMSException
+    {
+        return getNextMessage(queueName, 0);
+    }
+
+    /**
+     * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
+     *
+     * @param queueName   The Queue name to consume from
+     * @param readTimeout The timeout for each consume
+     *
+     * @throws javax.jms.JMSException Any exception that occurs during the consume
+     * @throws InterruptedException   If the consume thread was interrupted during a consume.
+     */
+    public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
+    {
+        if (!connected)
+        {
+            connect();
+        }
+
+        _logger.info("consuming queue " + queueName);
+        Queue queue = session.createQueue(queueName);
+
+        final MessageConsumer consumer = session.createConsumer(queue);
+        int messagesReceived = 0;
+
+        _logger.info("consuming...");
+        while ((consumer.receive(readTimeout)) != null)
+        {
+            messagesReceived++;
+        }
+
+        session.commit();
+        consumer.close();
+        _logger.info("consumed: " + messagesReceived);
+    }
+}

Propchange: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date