You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/03/13 11:35:47 UTC

svn commit: r517638 [2/2] - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/serv...

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java Tue Mar 13 03:35:42 2007
@@ -39,8 +39,8 @@
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.client.message.JMSTextMessage;
-import org.apache.qpid.testutil.VMBrokerSetup;
 
 public class PropertyValueTest extends TestCase implements MessageListener
 {
@@ -59,19 +59,13 @@
     protected void setUp() throws Exception
     {
         super.setUp();
-        try
-        {
-            init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
-        }
-        catch (Exception e)
-        {
-            fail("Unable to initialilse connection: " + e);
-        }
+        TransportConnection.createVMBroker(1);
     }
 
     protected void tearDown() throws Exception
     {
         super.tearDown();
+        TransportConnection.killVMBroker(1);
     }
 
     private void init(AMQConnection connection) throws Exception
@@ -91,14 +85,48 @@
         connection.start();
     }
 
-    public void test() throws Exception
+    public void testOnce()
+    {
+        runBatch(1);
+    }
+
+    public void test50()
+    {
+        runBatch(50);
+    }
+
+    private void runBatch(int runSize)
     {
-        int count = _count;
-        send(count);
-        waitFor(count);
-        check();
-        _logger.info("Completed without failure");
-        _connection.close();
+        try
+        {
+            int run = 0;
+            while (run < runSize)
+            {
+                _logger.error("Run Number:" + run++);
+                try
+                {
+                    init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
+                }
+                catch (Exception e)
+                {
+                    fail("Unable to initialilse connection: " + e);
+                }
+
+                int count = _count;
+                send(count);
+                waitFor(count);
+                check();
+                _logger.info("Completed without failure");
+                _connection.close();
+
+                _logger.error("End Run Number:" + (run - 1));
+            }
+        }
+        catch (Exception e)
+        {
+            _logger.error(e.getMessage(), e);
+            e.printStackTrace();
+        }
     }
 
     void send(int count) throws JMSException
@@ -138,7 +166,7 @@
             m.setJMSReplyTo(q);
             m.setStringProperty("TempQueue", q.toString());
 
-            _logger.info("Message:" + m);
+            _logger.trace("Message:" + m);
 
             Assert.assertEquals("Check temp queue has been set correctly",
                                 m.getJMSReplyTo().toString(), m.getStringProperty("TempQueue"));
@@ -150,7 +178,7 @@
             m.setShortProperty("Short", (short) Short.MAX_VALUE);
             m.setStringProperty("String", "Test");
 
-            _logger.info("Sending Msg:" + m);
+            _logger.debug("Sending Msg:" + m);
             producer.send(m);
         }
     }
@@ -206,8 +234,11 @@
             Assert.assertEquals("Check String properties are correctly transported",
                                 "Test", m.getStringProperty("String"));
         }
+        received.clear();
 
         assertEqual(messages.iterator(), actual.iterator());
+
+        messages.clear();
     }
 
     private static void assertEqual(Iterator expected, Iterator actual)
@@ -269,11 +300,11 @@
         {
             test._count = Integer.parseInt(argv[1]);
         }
-        test.test();
+        test.testOnce();
     }
 
     public static junit.framework.Test suite()
     {
-        return new VMBrokerSetup(new junit.framework.TestSuite(PropertyValueTest.class));
+        return new junit.framework.TestSuite(PropertyValueTest.class);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java Tue Mar 13 03:35:42 2007
@@ -42,6 +42,7 @@
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.url.URLSyntaxException;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.testutil.QpidClientConnection;
 import org.apache.log4j.Logger;
 import org.apache.log4j.Level;
 
@@ -62,14 +63,14 @@
     private boolean testReception = true;
 
     private long[] receieved = new long[numTestMessages + 1];
-    private boolean passed=false;
+    private boolean passed = false;
 
     protected void setUp() throws Exception
     {
         super.setUp();
         TransportConnection.createVMBroker(1);
 
-        QpidClientConnection conn = new QpidClientConnection();
+        QpidClientConnection conn = new QpidClientConnection(BROKER);
 
         conn.connect();
         // clear queue
@@ -85,21 +86,28 @@
     {
         super.tearDown();
 
-        if (!passed)
+        if (!passed)   // clean up
         {
-            QpidClientConnection conn = new QpidClientConnection();
+            QpidClientConnection conn = new QpidClientConnection(BROKER);
 
             conn.connect();
             // clear queue
             conn.consume(queue, consumeTimeout);
+
+            conn.disconnect();
         }
         TransportConnection.killVMBroker(1);
     }
 
-    /** multiple consumers */
+    /**
+     * multiple consumers
+     *
+     * @throws javax.jms.JMSException if a JMS problem occurs
+     * @throws InterruptedException   on timeout
+     */
     public void testDrain() throws JMSException, InterruptedException
     {
-        QpidClientConnection conn = new QpidClientConnection();
+        QpidClientConnection conn = new QpidClientConnection(BROKER);
 
         conn.connect();
 
@@ -170,6 +178,7 @@
         assertEquals(list.toString(), 0, failed);
         _logger.info("consumed: " + messagesReceived);
         conn.disconnect();
+        passed = true;
     }
 
     /** multiple consumers */
@@ -186,8 +195,8 @@
         Thread t4 = new Thread(c4);
 
         t1.start();
-//        t2.start();
-//        t3.start();
+        t2.start();
+        t3.start();
 //        t4.start();
 
         try
@@ -230,7 +239,7 @@
         }
         assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed);
         assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed);
-        passed=true;
+        passed = true;
     }
 
     class Consumer implements Runnable
@@ -248,7 +257,7 @@
             try
             {
                 _logger.info("consumer-" + id + ": starting");
-                QpidClientConnection conn = new QpidClientConnection();
+                QpidClientConnection conn = new QpidClientConnection(BROKER);
 
                 conn.connect();
 
@@ -318,286 +327,51 @@
     }
 
 
-    public class QpidClientConnection implements ExceptionListener
+    public void testRequeue() throws JMSException, AMQException, URLSyntaxException
     {
-        private boolean transacted = true;
-        private int ackMode = Session.CLIENT_ACKNOWLEDGE;
-        private Connection connection;
-
-        private String virtualHost;
-        private String brokerlist;
-        private int prefetch;
-        protected Session session;
-        protected boolean connected;
-
-        public QpidClientConnection()
+        int run = 0;
+        while (run < 10)
         {
-            super();
-            setVirtualHost("/test");
-            setBrokerList(BROKER);
-            setPrefetch(5000);
-        }
-
-
-        public void connect() throws JMSException
-        {
-            if (!connected)
-            {
-                /*
-                * amqp://[user:pass@][clientid]/virtualhost?
-                * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
-                * [&failover='method[?option='value'[&option='value']]']
-                * [&option='value']"
-                */
-                String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
-                try
-                {
-                    AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
-                    _logger.info("connecting to Qpid :" + brokerUrl);
-                    connection = factory.createConnection();
-
-                    // register exception listener
-                    connection.setExceptionListener(this);
-
-                    session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
+            run++;
 
-
-                    _logger.info("starting connection");
-                    connection.start();
-
-                    connected = true;
-                }
-                catch (URLSyntaxException e)
-                {
-                    throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
-                }
-            }
-        }
-
-        public void disconnect() throws JMSException
-        {
-            if (connected)
-            {
-                session.commit();
-                session.close();
-                connection.close();
-                connected = false;
-                _logger.info("disconnected");
-            }
-        }
-
-        public void disconnectWithoutCommit() throws JMSException
-        {
-            if (connected)
-            {
-                session.close();
-                connection.close();
-                connected = false;
-                _logger.info("disconnected without commit");
-            }
-        }
-
-        public String getBrokerList()
-        {
-            return brokerlist;
-        }
-
-        public void setBrokerList(String brokerlist)
-        {
-            this.brokerlist = brokerlist;
-        }
-
-        public String getVirtualHost()
-        {
-            return virtualHost;
-        }
-
-        public void setVirtualHost(String virtualHost)
-        {
-            this.virtualHost = virtualHost;
-        }
-
-        public void setPrefetch(int prefetch)
-        {
-            this.prefetch = prefetch;
-        }
-
-
-        /** override as necessary */
-        public void onException(JMSException exception)
-        {
-            _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
-        }
-
-        public boolean isConnected()
-        {
-            return connected;
-        }
-
-        public Session getSession()
-        {
-            return session;
-        }
-
-        /**
-         * Put a String as a text messages, repeat n times. A null payload will result in a null message.
-         *
-         * @param queueName The queue name to put to
-         * @param payload   the content of the payload
-         * @param copies    the number of messages to put
-         *
-         * @throws javax.jms.JMSException any exception that occurs
-         */
-        public void put(String queueName, String payload, int copies) throws JMSException
-        {
-            if (!connected)
-            {
-                connect();
-            }
-
-            _logger.info("putting to queue " + queueName);
-            Queue queue = session.createQueue(queueName);
-
-            final MessageProducer sender = session.createProducer(queue);
-
-            for (int i = 0; i < copies; i++)
-            {
-                Message m = session.createTextMessage(payload + i);
-                m.setIntProperty("index", i + 1);
-                sender.send(m);
-            }
-
-            session.commit();
-            sender.close();
-            _logger.info("put " + copies + " copies");
-        }
-
-        /**
-         * GET the top message on a queue. Consumes the message. Accepts timeout value.
-         *
-         * @param queueName   The quename to get from
-         * @param readTimeout The timeout to use
-         *
-         * @return the content of the text message if any
-         *
-         * @throws javax.jms.JMSException any exception that occured
-         */
-        public Message getNextMessage(String queueName, long readTimeout) throws JMSException
-        {
-            if (!connected)
+            if (_logger.isInfoEnabled())
             {
-                connect();
+                _logger.info("testRequeue run " + run);
             }
 
-            Queue queue = session.createQueue(queueName);
+            String virtualHost = "/test";
+            String brokerlist = BROKER;
+            String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
 
-            final MessageConsumer consumer = session.createConsumer(queue);
+            Connection conn = new AMQConnection(brokerUrl);
+            Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            Queue q = session.createQueue(queue);
 
-            Message message = consumer.receive(readTimeout);
-            session.commit();
-            consumer.close();
-
-            Message result;
+            _logger.debug("Create Consumer");
+            MessageConsumer consumer = session.createConsumer(q);
 
-            // all messages we consume should be TextMessages
-            if (message instanceof TextMessage)
-            {
-                result = ((TextMessage) message);
-            }
-            else if (null == message)
-            {
-                result = null;
-            }
-            else
+            try
             {
-                _logger.info("warning: received non-text message");
-                result = message;
+                Thread.sleep(2000);
             }
-
-            return result;
-        }
-
-        /**
-         * GET the top message on a queue. Consumes the message.
-         *
-         * @param queueName The Queuename to get from
-         *
-         * @return The string content of the text message, if any received
-         *
-         * @throws javax.jms.JMSException any exception that occurs
-         */
-        public Message getNextMessage(String queueName) throws JMSException
-        {
-            return getNextMessage(queueName, 0);
-        }
-
-        /**
-         * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
-         *
-         * @param queueName   The Queue name to consume from
-         * @param readTimeout The timeout for each consume
-         *
-         * @throws javax.jms.JMSException Any exception that occurs during the consume
-         * @throws InterruptedException   If the consume thread was interrupted during a consume.
-         */
-        public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
-        {
-            if (!connected)
+            catch (InterruptedException e)
             {
-                connect();
+                //
             }
 
-            _logger.info("consuming queue " + queueName);
-            Queue queue = session.createQueue(queueName);
+            _logger.debug("Receiving msg");
+            Message msg = consumer.receive(1000);
 
-            final MessageConsumer consumer = session.createConsumer(queue);
-            int messagesReceived = 0;
+            assertNotNull("Message should not be null", msg);
 
-            _logger.info("consuming...");
-            while ((consumer.receive(readTimeout)) != null)
-            {
-                messagesReceived++;
-            }
 
-            session.commit();
+            // As we have not ack'd message will be requeued.
+            _logger.debug("Close Consumer");
             consumer.close();
-            _logger.info("consumed: " + messagesReceived);
-        }
-    }
-
-
-    public void testRequeue() throws JMSException, AMQException, URLSyntaxException
-    {
-        String virtualHost = "/test";
-        String brokerlist = "vm://:1";
-        String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
-
-        Connection conn = new AMQConnection(brokerUrl);
-        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue q = session.createQueue(queue);
-
-        _logger.info("Create Consumer");
-        MessageConsumer consumer = session.createConsumer(q);
 
-        try
-        {
-            Thread.sleep(2000);
-        }
-        catch (InterruptedException e)
-        {
-            //
+            _logger.debug("Close Connection");
+            conn.close();
         }
-
-        _logger.info("Receiving msg");
-        Message msg = consumer.receive();
-
-        assertNotNull("Message should not be null", msg);
-
-        _logger.info("Close Consumer");
-        consumer.close();
-
-        _logger.info("Close Connection");
-        conn.close();
     }
 
 }

Added: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java?view=auto&rev=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java Tue Mar 13 03:35:42 2007
@@ -0,0 +1,268 @@
+package org.apache.qpid.testutil;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Logger;
+
+import javax.jms.ExceptionListener;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+public class QpidClientConnection implements ExceptionListener
+{
+
+    private static final Logger _logger = Logger.getLogger(QpidClientConnection.class);
+
+    private boolean transacted = true;
+    private int ackMode = Session.CLIENT_ACKNOWLEDGE;
+    private Connection connection;
+
+    private String virtualHost;
+    private String brokerlist;
+    private int prefetch;
+    protected Session session;
+    protected boolean connected;
+
+    public QpidClientConnection(String broker)
+    {
+        super();
+        setVirtualHost("/test");
+        setBrokerList(broker);
+        setPrefetch(5000);
+    }
+
+
+    public void connect() throws JMSException
+    {
+        if (!connected)
+        {
+            /*
+            * amqp://[user:pass@][clientid]/virtualhost?
+            * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
+            * [&failover='method[?option='value'[&option='value']]']
+            * [&option='value']"
+            */
+            String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+            try
+            {
+                AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
+                _logger.info("connecting to Qpid :" + brokerUrl);
+                connection = factory.createConnection();
+
+                // register exception listener
+                connection.setExceptionListener(this);
+
+                session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
+
+
+                _logger.info("starting connection");
+                connection.start();
+
+                connected = true;
+            }
+            catch (URLSyntaxException e)
+            {
+                throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
+            }
+        }
+    }
+
+    public void disconnect() throws JMSException
+    {
+        if (connected)
+        {
+            session.commit();
+            session.close();
+            connection.close();
+            connected = false;
+            _logger.info("disconnected");
+        }
+    }
+
+    public void disconnectWithoutCommit() throws JMSException
+    {
+        if (connected)
+        {
+            session.close();
+            connection.close();
+            connected = false;
+            _logger.info("disconnected without commit");
+        }
+    }
+
+    public String getBrokerList()
+    {
+        return brokerlist;
+    }
+
+    public void setBrokerList(String brokerlist)
+    {
+        this.brokerlist = brokerlist;
+    }
+
+    public String getVirtualHost()
+    {
+        return virtualHost;
+    }
+
+    public void setVirtualHost(String virtualHost)
+    {
+        this.virtualHost = virtualHost;
+    }
+
+    public void setPrefetch(int prefetch)
+    {
+        this.prefetch = prefetch;
+    }
+
+
+    /** override as necessary */
+    public void onException(JMSException exception)
+    {
+        _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
+    }
+
+    public boolean isConnected()
+    {
+        return connected;
+    }
+
+    public Session getSession()
+    {
+        return session;
+    }
+
+    /**
+     * Put a String as a text messages, repeat n times. A null payload will result in a null message.
+     *
+     * @param queueName The queue name to put to
+     * @param payload   the content of the payload
+     * @param copies    the number of messages to put
+     *
+     * @throws javax.jms.JMSException any exception that occurs
+     */
+    public void put(String queueName, String payload, int copies) throws JMSException
+    {
+        if (!connected)
+        {
+            connect();
+        }
+
+        _logger.info("putting to queue " + queueName);
+        Queue queue = session.createQueue(queueName);
+
+        final MessageProducer sender = session.createProducer(queue);
+
+        for (int i = 0; i < copies; i++)
+        {
+            Message m = session.createTextMessage(payload + i);
+            m.setIntProperty("index", i + 1);
+            sender.send(m);
+        }
+
+        session.commit();
+        sender.close();
+        _logger.info("put " + copies + " copies");
+    }
+
+    /**
+     * GET the top message on a queue. Consumes the message. Accepts timeout value.
+     *
+     * @param queueName   The quename to get from
+     * @param readTimeout The timeout to use
+     *
+     * @return the content of the text message if any
+     *
+     * @throws javax.jms.JMSException any exception that occured
+     */
+    public Message getNextMessage(String queueName, long readTimeout) throws JMSException
+    {
+        if (!connected)
+        {
+            connect();
+        }
+
+        Queue queue = session.createQueue(queueName);
+
+        final MessageConsumer consumer = session.createConsumer(queue);
+
+        Message message = consumer.receive(readTimeout);
+        session.commit();
+        consumer.close();
+
+        Message result;
+
+        // all messages we consume should be TextMessages
+        if (message instanceof TextMessage)
+        {
+            result = ((TextMessage) message);
+        }
+        else if (null == message)
+        {
+            result = null;
+        }
+        else
+        {
+            _logger.info("warning: received non-text message");
+            result = message;
+        }
+
+        return result;
+    }
+
+    /**
+     * GET the top message on a queue. Consumes the message.
+     *
+     * @param queueName The Queuename to get from
+     *
+     * @return The string content of the text message, if any received
+     *
+     * @throws javax.jms.JMSException any exception that occurs
+     */
+    public Message getNextMessage(String queueName) throws JMSException
+    {
+        return getNextMessage(queueName, 0);
+    }
+
+    /**
+     * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
+     *
+     * @param queueName   The Queue name to consume from
+     * @param readTimeout The timeout for each consume
+     *
+     * @throws javax.jms.JMSException Any exception that occurs during the consume
+     * @throws InterruptedException   If the consume thread was interrupted during a consume.
+     */
+    public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
+    {
+        if (!connected)
+        {
+            connect();
+        }
+
+        _logger.info("consuming queue " + queueName);
+        Queue queue = session.createQueue(queueName);
+
+        final MessageConsumer consumer = session.createConsumer(queue);
+        int messagesReceived = 0;
+
+        _logger.info("consuming...");
+        while ((consumer.receive(readTimeout)) != null)
+        {
+            messagesReceived++;
+        }
+
+        session.commit();
+        consumer.close();
+        _logger.info("consumed: " + messagesReceived);
+    }
+}
+

Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java?view=diff&rev=517638&r1=517637&r2=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java Tue Mar 13 03:35:42 2007
@@ -181,8 +181,37 @@
     @Override
     public Iterator<E> iterator()
     {
-        throw new RuntimeException("Not Implemented");
+        final Iterator<E> mainMessageIterator = super.iterator();
+        return new Iterator<E>()
+        {
+            final Iterator<E> _headIterator = _messageHead.iterator();
+            final Iterator<E> _mainIterator = mainMessageIterator;
 
+            Iterator<E> last;
+
+            public boolean hasNext()
+            {
+                return _headIterator.hasNext() || _mainIterator.hasNext();
+            }
+
+            public E next()
+            {
+                if (_headIterator.hasNext())
+                {
+                    last = _headIterator;
+                    return _headIterator.next();
+                }
+                else
+                {
+                    last = _mainIterator;
+                    return _mainIterator.next();
+                }
+            }
+            public void remove()
+            {
+                last.remove();
+            }
+        };
     }
 
     @Override

Added: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java?view=auto&rev=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java (added)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java Tue Mar 13 03:35:42 2007
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import javax.jms.Queue;
+import javax.jms.ConnectionFactory;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.HashMap;
+
+public class VMTestCase extends TestCase
+{
+    protected long RECEIVE_TIMEOUT = 1000l;    // 1 sec
+    protected long CLOSE_TIMEOUT = 10000l;     // 10 secs
+
+    protected Context _context;
+    protected String _clientID;
+    protected String _virtualhost;
+    protected String _brokerlist;
+
+    protected final Map<String, String> _connections = new HashMap<String, String>();
+    protected final Map<String, String> _queues = new HashMap<String, String>();
+    protected final Map<String, String> _topics = new HashMap<String, String>();
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        try
+        {
+            TransportConnection.createVMBroker(1);
+        }
+        catch (Exception e)
+        {
+            fail("Unable to create broker: " + e);
+        }
+
+        InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+        Hashtable<String, String> env = new Hashtable<String, String>();
+
+        if (_clientID == null)
+        {
+            _clientID = this.getClass().getName();
+        }
+
+        if (_virtualhost == null)
+        {
+            _virtualhost = "/test";
+        }
+
+        if (_brokerlist == null)
+        {
+            _brokerlist = "vm://:1";
+        }
+
+        env.put("connectionfactory.connection", "amqp://client:client@" +
+                                                _clientID + _virtualhost + "?brokerlist='" + _brokerlist + "'");
+
+        for (Map.Entry<String, String> c : _connections.entrySet())
+        {
+            env.put("connectionfactory." + c.getKey(), c.getValue());
+        }
+
+        env.put("queue.queue", "queue");
+
+        for (Map.Entry<String, String> q : _queues.entrySet())
+        {
+            env.put("queue." + q.getKey(), q.getValue());
+        }
+
+        env.put("topic.topic", "topic");
+
+        for (Map.Entry<String, String> t : _topics.entrySet())
+        {
+            env.put("topic." + t.getKey(), t.getValue());
+        }
+
+        _context = factory.getInitialContext(env);
+    }
+
+    protected void tearDown() throws Exception
+    {
+        TransportConnection.killVMBroker(1);
+        super.tearDown();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/VMTestCase.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java?view=auto&rev=517638
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java (added)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java Tue Mar 13 03:35:42 2007
@@ -0,0 +1,150 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package org.apache.qpid.test.client;
+
+import org.apache.qpid.test.VMTestCase;
+import org.apache.log4j.Logger;
+
+import javax.jms.Queue;
+import javax.jms.ConnectionFactory;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.QueueBrowser;
+import javax.jms.TextMessage;
+import javax.jms.JMSException;
+import javax.jms.QueueReceiver;
+import javax.jms.Message;
+import java.util.Enumeration;
+
+public class QueueBrowserTest extends VMTestCase
+{
+    private static final Logger _logger = Logger.getLogger(QueueBrowserTest.class);
+
+    private static final int MSG_COUNT = 10;
+
+    private Connection _clientConnection;
+    private Session _clientSession;
+    private Queue _queue;
+
+    public void setUp() throws Exception
+    {
+
+        super.setUp();
+
+        _queue = (Queue) _context.lookup("queue");
+
+        //Create Client
+        _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+        _clientConnection.start();
+
+        _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        //Ensure _queue is created
+        _clientSession.createConsumer(_queue).close();
+
+        //Create Producer put some messages on the queue
+        Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+        producerConnection.start();
+
+        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = producerSession.createProducer(_queue);
+
+        for (int msg = 0; msg < MSG_COUNT; msg++)
+        {
+            producer.send(producerSession.createTextMessage("Message " + msg));
+        }
+
+        producerConnection.close();
+
+    }
+
+    /*
+    * Test Messages Remain on Queue
+    * Create a queu and send messages to it. Browse them and then receive them all to verify they were still there
+    *
+    */
+
+    public void queueBrowserMsgsRemainOnQueueTest() throws JMSException
+    {
+
+        // create QueueBrowser
+        _logger.info("Creating Queue Browser");
+
+        QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);
+
+        // check for messages
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Checking for " + MSG_COUNT + " messages with QueueBrowser");
+        }
+
+        int msgCount = 0;
+        Enumeration msgs = queueBrowser.getEnumeration();
+
+        while (msgs.hasMoreElements())
+        {
+            msgs.nextElement();
+            msgCount++;
+        }
+
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Found " + msgCount + " messages total in browser");
+        }
+
+        // check to see if all messages found
+//        assertEquals("browser did not find all messages", MSG_COUNT, msgCount);
+        if (msgCount != MSG_COUNT)
+        {
+            _logger.warn(msgCount + "/" + MSG_COUNT + " messages received.");
+        }
+
+        //Close browser
+        queueBrowser.close();
+
+        // VERIFY
+
+        // continue and try to receive all messages
+        MessageConsumer consumer = _clientSession.createConsumer(_queue);
+
+        _logger.info("Verify messages are still on the queue");
+
+        Message tempMsg;
+
+        for (msgCount = 0; msgCount < MSG_COUNT; msgCount++)
+        {
+            tempMsg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT);
+            if (tempMsg == null)
+            {
+                fail("Message " + msgCount + " not retrieved from queue");
+            }
+        }
+
+        _logger.info("All messages recevied from queue");
+    }
+
+
+}

Propchange: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date