You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/28 17:40:23 UTC

svn commit: r580389 - in /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid: test/unit/basic/ test/unit/topic/ test/unit/transacted/ testutil/

Author: arnaudsimon
Date: Fri Sep 28 08:40:21 2007
New Revision: 580389

URL: http://svn.apache.org/viewvc?rev=580389&view=rev
Log: (empty)

Modified:
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java?rev=580389&r1=580388&r2=580389&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java Fri Sep 28 08:40:21 2007
@@ -21,7 +21,6 @@
 package org.apache.qpid.test.unit.basic;
 
 import junit.framework.Assert;
-import junit.framework.TestCase;
 
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
@@ -29,6 +28,7 @@
 import org.apache.qpid.client.message.JMSTextMessage;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.testutil.QpidTestCase;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +45,7 @@
 import java.util.Iterator;
 import java.util.List;
 
-public class TextMessageTest extends TestCase implements MessageListener
+public class TextMessageTest extends QpidTestCase implements MessageListener
 {
     private static final Logger _logger = LoggerFactory.getLogger(TextMessageTest.class);
 
@@ -62,7 +62,7 @@
         super.setUp();
         try
         {
-            init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
+            init((AMQConnection) getConnection("guest", "guest"));
         }
         catch (Exception e)
         {
@@ -89,7 +89,15 @@
         _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
         // set up a slow consumer
-        _session.createConsumer(destination).setMessageListener(this);
+        try
+        {
+            _session.createConsumer(destination).setMessageListener(this);
+        }
+        catch (Throwable  e)
+        {
+// TODO
+            e.printStackTrace();
+        }
         connection.start();
     }
 
@@ -117,6 +125,7 @@
             _logger.info("Sending Msg:" + m);
             producer.send(m);
         }
+        _logger.info("sent " + count  + " mesages");
     }
 
     void waitFor(int count) throws InterruptedException
@@ -227,6 +236,7 @@
     {
         synchronized (received)
         {
+            _logger.info("===== received one message");
             received.add((JMSTextMessage) message);
             received.notify();
         }
@@ -237,21 +247,10 @@
         return in + System.currentTimeMillis();
     }
 
-    public static void main(String[] argv) throws Exception
-    {
-        TextMessageTest test = new TextMessageTest();
-        test._connectionString = (argv.length == 0) ? "vm://:1" : argv[0];
-        test.setUp();
-        if (argv.length > 1)
-        {
-            test._count = Integer.parseInt(argv[1]);
-        }
 
-        test.test();
-    }
 
     public static junit.framework.Test suite()
     {
-        return new VMBrokerSetup(new junit.framework.TestSuite(TextMessageTest.class));
+         return new junit.framework.TestSuite(TextMessageTest.class);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?rev=580389&r1=580388&r2=580389&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Fri Sep 28 08:40:21 2007
@@ -36,30 +36,29 @@
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.testutil.QpidTestCase;
 
 
 /** @author Apache Software Foundation */
-public class TopicSessionTest extends TestCase
+public class TopicSessionTest extends QpidTestCase
 {
     private static final String BROKER = "vm://:1";
 
     protected void setUp() throws Exception
     {
         super.setUp();
-        TransportConnection.createVMBroker(1);
     }
 
     protected void tearDown() throws Exception
     {
         super.tearDown();
-        TransportConnection.killAllVMBrokers();
     }
 
 
     public void testTopicSubscriptionUnsubscription() throws Exception
     {
 
-        AMQConnection con = new AMQConnection(BROKER+"?retries='0'", "guest", "guest", "test", "test");
+        AMQConnection con = (AMQConnection) getConnection("guest", "guest");
         AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(), "MyTopic");
         TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
         TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
@@ -104,7 +103,7 @@
 
     private void subscriptionNameReuseForDifferentTopic(boolean shutdown) throws Exception
     {
-        AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
+        AMQConnection con = (AMQConnection) getConnection("guest", "guest");
         AMQTopic topic = new AMQTopic(con, "MyTopic1" + String.valueOf(shutdown));
         AMQTopic topic2 = new AMQTopic(con, "MyOtherTopic1" + String.valueOf(shutdown));
 
@@ -143,13 +142,13 @@
 
     public void testUnsubscriptionAfterConnectionClose() throws Exception
     {
-        AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
+        AMQConnection con1 = (AMQConnection) getConnection("guest", "guest");
         AMQTopic topic = new AMQTopic(con1, "MyTopic3");
 
         TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
         TopicPublisher publisher = session1.createPublisher(topic);
 
-        AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "test");
+        AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
         TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
         TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0");
 
@@ -174,7 +173,7 @@
     public void testTextMessageCreation() throws Exception
     {
 
-        AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
+        AMQConnection con = (AMQConnection) getConnection("guest", "guest");
         AMQTopic topic = new AMQTopic(con, "MyTopic4");
         TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
         TopicPublisher publisher = session1.createPublisher(topic);
@@ -214,7 +213,7 @@
 
     public void testSendingSameMessage() throws Exception
     {
-        AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
+        AMQConnection conn = (AMQConnection) getConnection("guest", "guest");
         TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
         TemporaryTopic topic = session.createTemporaryTopic();
         assertNotNull(topic);
@@ -237,7 +236,7 @@
 
     public void testTemporaryTopic() throws Exception
     {
-        AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
+        AMQConnection conn = (AMQConnection) getConnection("guest", "guest");
         TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
         TemporaryTopic topic = session.createTemporaryTopic();
         assertNotNull(topic);
@@ -289,7 +288,7 @@
     public void testNoLocal() throws Exception
     {
 
-        AMQConnection con = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test", "test");
+        AMQConnection con = (AMQConnection) getConnection("guest", "guest");
 
         AMQTopic topic = new AMQTopic(con, "testNoLocal");
 
@@ -341,7 +340,7 @@
         m = (TextMessage) noLocal.receive(100);
         assertNull(m);
 
-        AMQConnection con2 = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test2", "test");
+        AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
         TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
         TopicPublisher publisher2 = session2.createPublisher(topic);
 

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=580389&r1=580388&r2=580389&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Fri Sep 28 08:40:21 2007
@@ -22,6 +22,7 @@
 
 import junit.framework.TestCase;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.testutil.QpidTestCase;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.url.URLSyntaxException;
@@ -41,7 +42,7 @@
  *
  * Assumptions; - Assumes empty Queue
  */
-public class CommitRollbackTest extends TestCase
+public class CommitRollbackTest extends QpidTestCase
 {
     protected AMQConnection conn;
     protected String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue";
@@ -54,7 +55,6 @@
     Queue _jmsQueue;
 
     private static final Logger _logger = LoggerFactory.getLogger(CommitRollbackTest.class);
-    private static final String BROKER = "vm://:1";
     private boolean _gotone = false;
     private boolean _gottwo = false;
     private boolean _gottwoRedelivered = false;
@@ -62,20 +62,14 @@
     protected void setUp() throws Exception
     {
         super.setUp();
-        if (BROKER.startsWith("vm"))
-        {
-            TransportConnection.createVMBroker(1);
-        }
-
         testMethod++;
         queue += testMethod;
-
         newConnection();
     }
 
-    private void newConnection() throws AMQException, URLSyntaxException, JMSException
+    private void newConnection() throws Exception
     {
-        conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='" + BROKER + "'");
+        conn = (AMQConnection) getConnection("guest", "guest");
 
         _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
 
@@ -92,12 +86,7 @@
     protected void tearDown() throws Exception
     {
         super.tearDown();
-
         conn.close();
-        if (BROKER.startsWith("vm"))
-        {
-            TransportConnection.killVMBroker(1);
-        }
     }
 
     /**

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java?rev=580389&r1=580388&r2=580389&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java Fri Sep 28 08:40:21 2007
@@ -27,6 +27,8 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.testutil.QpidTestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,233 +60,275 @@
 
     protected void setUp() throws Exception
     {
-        super.setUp();
-        _logger.info("Create Connection");
-        con = (AMQConnection) getConnection("guest", "guest");
-        _logger.info("Create Session");
-        session = con.createSession(true, Session.SESSION_TRANSACTED);
-        _logger.info("Create Q1");
-        queue1 =
-            new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false,
-                true);
-        _logger.info("Create Q2");
-        AMQQueue queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false);
-
-        _logger.info("Create Consumer of Q1");
-        consumer1 = session.createConsumer(queue1);
-        // Dummy just to create the queue.
-        _logger.info("Create Consumer of Q2");
-        MessageConsumer consumer2 = session.createConsumer(queue2);
-        _logger.info("Close Consumer of Q2");
-        consumer2.close();
-
-        _logger.info("Create producer to Q2");
-        producer2 = session.createProducer(queue2);
-
-        _logger.info("Start Connection");
-        con.start();
-
-        _logger.info("Create prep connection");
-        prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test");
-
-        _logger.info("Create prep session");
-        prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
-
-        _logger.info("Create prep producer to Q1");
-        prepProducer1 = prepSession.createProducer(queue1);
-
-        _logger.info("Create prep connection start");
-        prepCon.start();
-
-        _logger.info("Create test connection");
-        testCon = (AMQConnection) getConnection("guest", "guest");
-        _logger.info("Create test session");
-        testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
-        _logger.info("Create test consumer of q2");
-        testConsumer2 = testSession.createConsumer(queue2);
+        try
+        {
+            super.setUp();
+            _logger.info("Create Connection");
+            con = (AMQConnection) getConnection("guest", "guest");
+            _logger.info("Create Session");
+            session = con.createSession(true, Session.SESSION_TRANSACTED);
+            _logger.info("Create Q1");
+            queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"),
+                                  new AMQShortString("Q1"), false, true);
+            _logger.info("Create Q2");
+            AMQQueue queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false);
+
+            _logger.info("Create Consumer of Q1");
+            consumer1 = session.createConsumer(queue1);
+            // Dummy just to create the queue.
+            _logger.info("Create Consumer of Q2");
+            MessageConsumer consumer2 = session.createConsumer(queue2);
+            _logger.info("Close Consumer of Q2");
+            consumer2.close();
+
+            _logger.info("Create producer to Q2");
+            producer2 = session.createProducer(queue2);
+
+            _logger.info("Start Connection");
+            con.start();
+
+            _logger.info("Create prep connection");
+            prepCon = (AMQConnection) getConnection("guest", "guest");
+
+            _logger.info("Create prep session");
+            prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+            _logger.info("Create prep producer to Q1");
+            prepProducer1 = prepSession.createProducer(queue1);
+
+            _logger.info("Create prep connection start");
+            prepCon.start();
+
+            _logger.info("Create test connection");
+            testCon = (AMQConnection) getConnection("guest", "guest");
+            _logger.info("Create test session");
+            testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+            _logger.info("Create test consumer of q2");
+            testConsumer2 = testSession.createConsumer(queue2);
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+            stopBroker();
+            throw e;
+        }
     }
 
     protected void tearDown() throws Exception
     {
-        _logger.info("Close connection");
-        con.close();
-        _logger.info("Close test connection");
-        testCon.close();
-        _logger.info("Close prep connection");
-        prepCon.close();
-        super.tearDown();
+        try
+        {
+            _logger.info("Close connection");
+            con.close();
+            _logger.info("Close test connection");
+            testCon.close();
+            _logger.info("Close prep connection");
+            prepCon.close();
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+        }
+        finally
+        {
+            super.tearDown();
+        }
     }
 
     public void testCommit() throws Exception
     {
-        // add some messages
-        _logger.info("Send prep A");
-        prepProducer1.send(prepSession.createTextMessage("A"));
-        _logger.info("Send prep B");
-        prepProducer1.send(prepSession.createTextMessage("B"));
-        _logger.info("Send prep C");
-        prepProducer1.send(prepSession.createTextMessage("C"));
-
-        // send and receive some messages
-        _logger.info("Send X to Q2");
-        producer2.send(session.createTextMessage("X"));
-        _logger.info("Send Y to Q2");
-        producer2.send(session.createTextMessage("Y"));
-        _logger.info("Send Z to Q2");
-        producer2.send(session.createTextMessage("Z"));
-
-        _logger.info("Read A from Q1");
-        expect("A", consumer1.receive(1000));
-        _logger.info("Read B from Q1");
-        expect("B", consumer1.receive(1000));
-        _logger.info("Read C from Q1");
-        expect("C", consumer1.receive(1000));
-
-        // commit
-        _logger.info("session commit");
-        session.commit();
-        _logger.info("Start test Connection");
-        testCon.start();
-
-        // ensure sent messages can be received and received messages are gone
-        _logger.info("Read X from Q2");
-        expect("X", testConsumer2.receive(1000));
-        _logger.info("Read Y from Q2");
-        expect("Y", testConsumer2.receive(1000));
-        _logger.info("Read Z from Q2");
-        expect("Z", testConsumer2.receive(1000));
-
-        _logger.info("create test session on Q1");
-        testConsumer1 = testSession.createConsumer(queue1);
-        _logger.info("Read null from Q1");
-        assertTrue(null == testConsumer1.receive(1000));
-        _logger.info("Read null from Q2");
-        assertTrue(null == testConsumer2.receive(1000));
+        try
+        {
+// add some messages
+            _logger.info("Send prep A");
+            prepProducer1.send(prepSession.createTextMessage("A"));
+            _logger.info("Send prep B");
+            prepProducer1.send(prepSession.createTextMessage("B"));
+            _logger.info("Send prep C");
+            prepProducer1.send(prepSession.createTextMessage("C"));
+
+            // send and receive some messages
+            _logger.info("Send X to Q2");
+            producer2.send(session.createTextMessage("X"));
+            _logger.info("Send Y to Q2");
+            producer2.send(session.createTextMessage("Y"));
+            _logger.info("Send Z to Q2");
+            producer2.send(session.createTextMessage("Z"));
+
+            _logger.info("Read A from Q1");
+            expect("A", consumer1.receive(1000));
+            _logger.info("Read B from Q1");
+            expect("B", consumer1.receive(1000));
+            _logger.info("Read C from Q1");
+            expect("C", consumer1.receive(1000));
+
+            // commit
+            _logger.info("session commit");
+            session.commit();
+            _logger.info("Start test Connection");
+            testCon.start();
+
+            // ensure sent messages can be received and received messages are gone
+            _logger.info("Read X from Q2");
+            expect("X", testConsumer2.receive(1000));
+            _logger.info("Read Y from Q2");
+            expect("Y", testConsumer2.receive(1000));
+            _logger.info("Read Z from Q2");
+            expect("Z", testConsumer2.receive(1000));
+
+            _logger.info("create test session on Q1");
+            testConsumer1 = testSession.createConsumer(queue1);
+            _logger.info("Read null from Q1");
+            assertTrue(null == testConsumer1.receive(1000));
+            _logger.info("Read null from Q2");
+            assertTrue(null == testConsumer2.receive(1000));
+        }
+        catch (Throwable e)
+        {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
     }
 
     public void testRollback() throws Exception
     {
-        // add some messages
-        _logger.info("Send prep RB_A");
-        prepProducer1.send(prepSession.createTextMessage("RB_A"));
-        _logger.info("Send prep RB_B");
-        prepProducer1.send(prepSession.createTextMessage("RB_B"));
-        _logger.info("Send prep RB_C");
-        prepProducer1.send(prepSession.createTextMessage("RB_C"));
-
-        _logger.info("Sending RB_X RB_Y RB_Z");
-        producer2.send(session.createTextMessage("RB_X"));
-        producer2.send(session.createTextMessage("RB_Y"));
-        producer2.send(session.createTextMessage("RB_Z"));
-        _logger.info("Receiving RB_A RB_B");
-        expect("RB_A", consumer1.receive(1000));
-        expect("RB_B", consumer1.receive(1000));
-        // Don't consume 'RB_C' leave it in the prefetch cache to ensure rollback removes it.
-        // Quick sleep to ensure 'RB_C' gets pre-fetched
-        Thread.sleep(500);
-
-        // rollback
-        _logger.info("rollback");
-        session.rollback();
-
-        _logger.info("Receiving RB_A RB_B RB_C");
-        // ensure sent messages are not visible and received messages are requeued
-        expect("RB_A", consumer1.receive(1000), true);
-        expect("RB_B", consumer1.receive(1000), true);
-        expect("RB_C", consumer1.receive(1000), true);
-
-        _logger.info("Starting new connection");
-        testCon.start();
-        testConsumer1 = testSession.createConsumer(queue1);
-        _logger.info("Testing we have no messages left");
-        assertTrue(null == testConsumer1.receive(1000));
-        assertTrue(null == testConsumer2.receive(1000));
-
-        session.commit();
-
-        _logger.info("Testing we have no messages left after commit");
-        assertTrue(null == testConsumer1.receive(1000));
-        assertTrue(null == testConsumer2.receive(1000));
+        try
+        {
+// add some messages
+            _logger.info("Send prep RB_A");
+            prepProducer1.send(prepSession.createTextMessage("RB_A"));
+            _logger.info("Send prep RB_B");
+            prepProducer1.send(prepSession.createTextMessage("RB_B"));
+            _logger.info("Send prep RB_C");
+            prepProducer1.send(prepSession.createTextMessage("RB_C"));
+
+            _logger.info("Sending RB_X RB_Y RB_Z");
+            producer2.send(session.createTextMessage("RB_X"));
+            producer2.send(session.createTextMessage("RB_Y"));
+            producer2.send(session.createTextMessage("RB_Z"));
+            _logger.info("Receiving RB_A RB_B");
+            expect("RB_A", consumer1.receive(1000));
+            expect("RB_B", consumer1.receive(1000));
+            // Don't consume 'RB_C' leave it in the prefetch cache to ensure rollback removes it.
+            // Quick sleep to ensure 'RB_C' gets pre-fetched
+            Thread.sleep(500);
+
+            // rollback
+            _logger.info("rollback");
+            session.rollback();
+
+            _logger.info("Receiving RB_A RB_B RB_C");
+            // ensure sent messages are not visible and received messages are requeued
+            expect("RB_A", consumer1.receive(1000), true);
+            expect("RB_B", consumer1.receive(1000), true);
+            expect("RB_C", consumer1.receive(1000), true);
+
+            _logger.info("Starting new connection");
+            testCon.start();
+            testConsumer1 = testSession.createConsumer(queue1);
+            _logger.info("Testing we have no messages left");
+            assertTrue(null == testConsumer1.receive(1000));
+            assertTrue(null == testConsumer2.receive(1000));
+
+            session.commit();
+
+            _logger.info("Testing we have no messages left after commit");
+            assertTrue(null == testConsumer1.receive(1000));
+            assertTrue(null == testConsumer2.receive(1000));
+        }
+        catch (Throwable e)
+        {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
     }
 
     public void testResendsMsgsAfterSessionClose() throws Exception
     {
-        AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
+        try
+        {
+            AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
 
-        Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
-        AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false);
-        MessageConsumer consumer = consumerSession.createConsumer(queue3);
+            Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
+            AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false);
+            MessageConsumer consumer = consumerSession.createConsumer(queue3);
 
-        AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
-        Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED);
-        MessageProducer producer = producerSession.createProducer(queue3);
+            AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
+            Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED);
+            MessageProducer producer = producerSession.createProducer(queue3);
 
-        _logger.info("Sending four messages");
-        producer.send(producerSession.createTextMessage("msg1"));
-        producer.send(producerSession.createTextMessage("msg2"));
-        producer.send(producerSession.createTextMessage("msg3"));
-        producer.send(producerSession.createTextMessage("msg4"));
+            _logger.info("Sending four messages");
+            producer.send(producerSession.createTextMessage("msg1"));
+            producer.send(producerSession.createTextMessage("msg2"));
+            producer.send(producerSession.createTextMessage("msg3"));
+            producer.send(producerSession.createTextMessage("msg4"));
 
-        producerSession.commit();
+            producerSession.commit();
 
-        _logger.info("Starting connection");
-        con.start();
-        TextMessage tm = (TextMessage) consumer.receive();
-        assertNotNull(tm);
-        assertEquals("msg1", tm.getText());
+            _logger.info("Starting connection");
+            con.start();
+            TextMessage tm = (TextMessage) consumer.receive();
+            assertNotNull(tm);
+            assertEquals("msg1", tm.getText());
 
-        consumerSession.commit();
+            consumerSession.commit();
 
-        _logger.info("Received and committed first message");
-        tm = (TextMessage) consumer.receive(1000);
-        assertNotNull(tm);
-        assertEquals("msg2", tm.getText());
+            _logger.info("Received and committed first message");
+            tm = (TextMessage) consumer.receive(1000);
+            assertNotNull(tm);
+            assertEquals("msg2", tm.getText());
 
-        tm = (TextMessage) consumer.receive(1000);
-        assertNotNull(tm);
-        assertEquals("msg3", tm.getText());
+            tm = (TextMessage) consumer.receive(1000);
+            assertNotNull(tm);
+            assertEquals("msg3", tm.getText());
 
-        tm = (TextMessage) consumer.receive(1000);
-        assertNotNull(tm);
-        assertEquals("msg4", tm.getText());
+            tm = (TextMessage) consumer.receive(1000);
+            assertNotNull(tm);
+            assertEquals("msg4", tm.getText());
 
-        _logger.info("Received all four messages. Closing connection with three outstanding messages");
+            _logger.info("Received all four messages. Closing connection with three outstanding messages");
 
-        consumerSession.close();
+            consumerSession.close();
 
-        consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
+            consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
 
-        consumer = consumerSession.createConsumer(queue3);
+            consumer = consumerSession.createConsumer(queue3);
 
-        // no ack for last three messages so when I call recover I expect to get three messages back
-        tm = (TextMessage) consumer.receive(3000);
-        assertNotNull(tm);
-        assertEquals("msg2", tm.getText());
-        assertTrue("Message is not redelivered", tm.getJMSRedelivered());
+            // no ack for last three messages so when I call recover I expect to get three messages back
+            tm = (TextMessage) consumer.receive(3000);
+            assertNotNull(tm);
+            assertEquals("msg2", tm.getText());
+            assertTrue("Message is not redelivered", tm.getJMSRedelivered());
 
-        tm = (TextMessage) consumer.receive(3000);
-        assertNotNull(tm);
-        assertEquals("msg3", tm.getText());
-        assertTrue("Message is not redelivered", tm.getJMSRedelivered());
+            tm = (TextMessage) consumer.receive(3000);
+            assertNotNull(tm);
+            assertEquals("msg3", tm.getText());
+            assertTrue("Message is not redelivered", tm.getJMSRedelivered());
 
-        tm = (TextMessage) consumer.receive(3000);
-        assertNotNull(tm);
-        assertEquals("msg4", tm.getText());
-        assertTrue("Message is not redelivered", tm.getJMSRedelivered());
+            tm = (TextMessage) consumer.receive(3000);
+            assertNotNull(tm);
+            assertEquals("msg4", tm.getText());
+            assertTrue("Message is not redelivered", tm.getJMSRedelivered());
 
-        _logger.info("Received redelivery of three messages. Committing");
+            _logger.info("Received redelivery of three messages. Committing");
 
-        consumerSession.commit();
+            consumerSession.commit();
 
-        _logger.info("Called commit");
+            _logger.info("Called commit");
 
-        tm = (TextMessage) consumer.receive(1000);
-        assertNull(tm);
+            tm = (TextMessage) consumer.receive(1000);
+            assertNull(tm);
 
-        _logger.info("No messages redelivered as is expected");
+            _logger.info("No messages redelivered as is expected");
 
-        con.close();
-        con2.close();
+            con.close();
+            con2.close();
+        }
+        catch (Throwable e)
+        {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
     }
 
     private void expect(String text, Message msg) throws JMSException

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java?rev=580389&r1=580388&r2=580389&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java Fri Sep 28 08:40:21 2007
@@ -38,6 +38,7 @@
  */
 public class QpidTestCase extends TestCase
 {
+   
     /* this clas logger */
     private static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class);
 
@@ -46,6 +47,7 @@
     private static final String BROKER_PATH = "broker_path";
     private static final String BROKER_PARAM = "broker_param";
     private static final String BROKER_VM = "vm";
+    private static final String EXT_BROKER = "ext" ;
     /**
      * The process where the remote broker is running.
      */
@@ -79,18 +81,18 @@
         {
             _brokerParams = System.getProperties().getProperty(BROKER_PARAM);
         }
-        if (!_shel.equals(BROKER_VM))
+        if (!_shel.equals(BROKER_VM) && ! _shel.equals(EXT_BROKER) )
         {
             // start a new broker
             startBroker();
         }
-        else
+        else if ( ! _shel.equals(EXT_BROKER) )
         {
             // create an in_VM broker
             TransportConnection.createVMBroker(1);
         }
-        System.out.println("=========================================");
-        System.out.println("= " + _shel + " " + _brokerPath + " " + _brokerParams);
+       _logger.info("=========================================");
+        _logger.info("= " + _shel + " " + _brokerPath + " " + _brokerParams);
     }
 
     /**
@@ -100,17 +102,18 @@
      */
     protected void tearDown() throws Exception
     {
-        super.tearDown();
         _logger.info("Kill broker");
         if (_brokerProcess != null)
         {
             // destroy the currently running broker
             _brokerProcess.destroy();
+            _brokerProcess = null;
         }
         else
         {
             TransportConnection.killAllVMBrokers();
         }
+         super.tearDown();
     }
 
     //--------- Util method 
@@ -130,6 +133,11 @@
         {
             //bad, we had an error starting the broker
             throw new Exception("Problem when starting the broker: " + reader.readLine());
+        }
+        // We need to wait for th ebroker to start ideally we would need to ping it 
+        synchronized(this)
+        {
+            this.wait(1000);
         }
     }