You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/28 17:40:23 UTC
svn commit: r580389 - in
/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid:
test/unit/basic/ test/unit/topic/ test/unit/transacted/ testutil/
Author: arnaudsimon
Date: Fri Sep 28 08:40:21 2007
New Revision: 580389
URL: http://svn.apache.org/viewvc?rev=580389&view=rev
Log: (empty)
Modified:
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java?rev=580389&r1=580388&r2=580389&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java Fri Sep 28 08:40:21 2007
@@ -21,7 +21,6 @@
package org.apache.qpid.test.unit.basic;
import junit.framework.Assert;
-import junit.framework.TestCase;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
@@ -29,6 +28,7 @@
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.testutil.QpidTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +45,7 @@
import java.util.Iterator;
import java.util.List;
-public class TextMessageTest extends TestCase implements MessageListener
+public class TextMessageTest extends QpidTestCase implements MessageListener
{
private static final Logger _logger = LoggerFactory.getLogger(TextMessageTest.class);
@@ -62,7 +62,7 @@
super.setUp();
try
{
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
+ init((AMQConnection) getConnection("guest", "guest"));
}
catch (Exception e)
{
@@ -89,7 +89,15 @@
_session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// set up a slow consumer
- _session.createConsumer(destination).setMessageListener(this);
+ try
+ {
+ _session.createConsumer(destination).setMessageListener(this);
+ }
+ catch (Throwable e)
+ {
+// TODO
+ e.printStackTrace();
+ }
connection.start();
}
@@ -117,6 +125,7 @@
_logger.info("Sending Msg:" + m);
producer.send(m);
}
+ _logger.info("sent " + count + " mesages");
}
void waitFor(int count) throws InterruptedException
@@ -227,6 +236,7 @@
{
synchronized (received)
{
+ _logger.info("===== received one message");
received.add((JMSTextMessage) message);
received.notify();
}
@@ -237,21 +247,10 @@
return in + System.currentTimeMillis();
}
- public static void main(String[] argv) throws Exception
- {
- TextMessageTest test = new TextMessageTest();
- test._connectionString = (argv.length == 0) ? "vm://:1" : argv[0];
- test.setUp();
- if (argv.length > 1)
- {
- test._count = Integer.parseInt(argv[1]);
- }
- test.test();
- }
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(TextMessageTest.class));
+ return new junit.framework.TestSuite(TextMessageTest.class);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?rev=580389&r1=580388&r2=580389&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Fri Sep 28 08:40:21 2007
@@ -36,30 +36,29 @@
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.testutil.QpidTestCase;
/** @author Apache Software Foundation */
-public class TopicSessionTest extends TestCase
+public class TopicSessionTest extends QpidTestCase
{
private static final String BROKER = "vm://:1";
protected void setUp() throws Exception
{
super.setUp();
- TransportConnection.createVMBroker(1);
}
protected void tearDown() throws Exception
{
super.tearDown();
- TransportConnection.killAllVMBrokers();
}
public void testTopicSubscriptionUnsubscription() throws Exception
{
- AMQConnection con = new AMQConnection(BROKER+"?retries='0'", "guest", "guest", "test", "test");
+ AMQConnection con = (AMQConnection) getConnection("guest", "guest");
AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(), "MyTopic");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
@@ -104,7 +103,7 @@
private void subscriptionNameReuseForDifferentTopic(boolean shutdown) throws Exception
{
- AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
+ AMQConnection con = (AMQConnection) getConnection("guest", "guest");
AMQTopic topic = new AMQTopic(con, "MyTopic1" + String.valueOf(shutdown));
AMQTopic topic2 = new AMQTopic(con, "MyOtherTopic1" + String.valueOf(shutdown));
@@ -143,13 +142,13 @@
public void testUnsubscriptionAfterConnectionClose() throws Exception
{
- AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
+ AMQConnection con1 = (AMQConnection) getConnection("guest", "guest");
AMQTopic topic = new AMQTopic(con1, "MyTopic3");
TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
- AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "test");
+ AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0");
@@ -174,7 +173,7 @@
public void testTextMessageCreation() throws Exception
{
- AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
+ AMQConnection con = (AMQConnection) getConnection("guest", "guest");
AMQTopic topic = new AMQTopic(con, "MyTopic4");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
@@ -214,7 +213,7 @@
public void testSendingSameMessage() throws Exception
{
- AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
+ AMQConnection conn = (AMQConnection) getConnection("guest", "guest");
TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic topic = session.createTemporaryTopic();
assertNotNull(topic);
@@ -237,7 +236,7 @@
public void testTemporaryTopic() throws Exception
{
- AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
+ AMQConnection conn = (AMQConnection) getConnection("guest", "guest");
TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic topic = session.createTemporaryTopic();
assertNotNull(topic);
@@ -289,7 +288,7 @@
public void testNoLocal() throws Exception
{
- AMQConnection con = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test", "test");
+ AMQConnection con = (AMQConnection) getConnection("guest", "guest");
AMQTopic topic = new AMQTopic(con, "testNoLocal");
@@ -341,7 +340,7 @@
m = (TextMessage) noLocal.receive(100);
assertNull(m);
- AMQConnection con2 = new AMQConnection(BROKER + "?retries='0'", "guest", "guest", "test2", "test");
+ AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher2 = session2.createPublisher(topic);
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=580389&r1=580388&r2=580389&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Fri Sep 28 08:40:21 2007
@@ -22,6 +22,7 @@
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
+import org.apache.qpid.testutil.QpidTestCase;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.URLSyntaxException;
@@ -41,7 +42,7 @@
*
* Assumptions; - Assumes empty Queue
*/
-public class CommitRollbackTest extends TestCase
+public class CommitRollbackTest extends QpidTestCase
{
protected AMQConnection conn;
protected String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue";
@@ -54,7 +55,6 @@
Queue _jmsQueue;
private static final Logger _logger = LoggerFactory.getLogger(CommitRollbackTest.class);
- private static final String BROKER = "vm://:1";
private boolean _gotone = false;
private boolean _gottwo = false;
private boolean _gottwoRedelivered = false;
@@ -62,20 +62,14 @@
protected void setUp() throws Exception
{
super.setUp();
- if (BROKER.startsWith("vm"))
- {
- TransportConnection.createVMBroker(1);
- }
-
testMethod++;
queue += testMethod;
-
newConnection();
}
- private void newConnection() throws AMQException, URLSyntaxException, JMSException
+ private void newConnection() throws Exception
{
- conn = new AMQConnection("amqp://guest:guest@client/test?brokerlist='" + BROKER + "'");
+ conn = (AMQConnection) getConnection("guest", "guest");
_session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
@@ -92,12 +86,7 @@
protected void tearDown() throws Exception
{
super.tearDown();
-
conn.close();
- if (BROKER.startsWith("vm"))
- {
- TransportConnection.killVMBroker(1);
- }
}
/**
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java?rev=580389&r1=580388&r2=580389&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java Fri Sep 28 08:40:21 2007
@@ -27,6 +27,8 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.Session;
import org.apache.qpid.testutil.QpidTestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,233 +60,275 @@
protected void setUp() throws Exception
{
- super.setUp();
- _logger.info("Create Connection");
- con = (AMQConnection) getConnection("guest", "guest");
- _logger.info("Create Session");
- session = con.createSession(true, Session.SESSION_TRANSACTED);
- _logger.info("Create Q1");
- queue1 =
- new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false,
- true);
- _logger.info("Create Q2");
- AMQQueue queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false);
-
- _logger.info("Create Consumer of Q1");
- consumer1 = session.createConsumer(queue1);
- // Dummy just to create the queue.
- _logger.info("Create Consumer of Q2");
- MessageConsumer consumer2 = session.createConsumer(queue2);
- _logger.info("Close Consumer of Q2");
- consumer2.close();
-
- _logger.info("Create producer to Q2");
- producer2 = session.createProducer(queue2);
-
- _logger.info("Start Connection");
- con.start();
-
- _logger.info("Create prep connection");
- prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test");
-
- _logger.info("Create prep session");
- prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
-
- _logger.info("Create prep producer to Q1");
- prepProducer1 = prepSession.createProducer(queue1);
-
- _logger.info("Create prep connection start");
- prepCon.start();
-
- _logger.info("Create test connection");
- testCon = (AMQConnection) getConnection("guest", "guest");
- _logger.info("Create test session");
- testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
- _logger.info("Create test consumer of q2");
- testConsumer2 = testSession.createConsumer(queue2);
+ try
+ {
+ super.setUp();
+ _logger.info("Create Connection");
+ con = (AMQConnection) getConnection("guest", "guest");
+ _logger.info("Create Session");
+ session = con.createSession(true, Session.SESSION_TRANSACTED);
+ _logger.info("Create Q1");
+ queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"),
+ new AMQShortString("Q1"), false, true);
+ _logger.info("Create Q2");
+ AMQQueue queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false);
+
+ _logger.info("Create Consumer of Q1");
+ consumer1 = session.createConsumer(queue1);
+ // Dummy just to create the queue.
+ _logger.info("Create Consumer of Q2");
+ MessageConsumer consumer2 = session.createConsumer(queue2);
+ _logger.info("Close Consumer of Q2");
+ consumer2.close();
+
+ _logger.info("Create producer to Q2");
+ producer2 = session.createProducer(queue2);
+
+ _logger.info("Start Connection");
+ con.start();
+
+ _logger.info("Create prep connection");
+ prepCon = (AMQConnection) getConnection("guest", "guest");
+
+ _logger.info("Create prep session");
+ prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ _logger.info("Create prep producer to Q1");
+ prepProducer1 = prepSession.createProducer(queue1);
+
+ _logger.info("Create prep connection start");
+ prepCon.start();
+
+ _logger.info("Create test connection");
+ testCon = (AMQConnection) getConnection("guest", "guest");
+ _logger.info("Create test session");
+ testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _logger.info("Create test consumer of q2");
+ testConsumer2 = testSession.createConsumer(queue2);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ stopBroker();
+ throw e;
+ }
}
protected void tearDown() throws Exception
{
- _logger.info("Close connection");
- con.close();
- _logger.info("Close test connection");
- testCon.close();
- _logger.info("Close prep connection");
- prepCon.close();
- super.tearDown();
+ try
+ {
+ _logger.info("Close connection");
+ con.close();
+ _logger.info("Close test connection");
+ testCon.close();
+ _logger.info("Close prep connection");
+ prepCon.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ super.tearDown();
+ }
}
public void testCommit() throws Exception
{
- // add some messages
- _logger.info("Send prep A");
- prepProducer1.send(prepSession.createTextMessage("A"));
- _logger.info("Send prep B");
- prepProducer1.send(prepSession.createTextMessage("B"));
- _logger.info("Send prep C");
- prepProducer1.send(prepSession.createTextMessage("C"));
-
- // send and receive some messages
- _logger.info("Send X to Q2");
- producer2.send(session.createTextMessage("X"));
- _logger.info("Send Y to Q2");
- producer2.send(session.createTextMessage("Y"));
- _logger.info("Send Z to Q2");
- producer2.send(session.createTextMessage("Z"));
-
- _logger.info("Read A from Q1");
- expect("A", consumer1.receive(1000));
- _logger.info("Read B from Q1");
- expect("B", consumer1.receive(1000));
- _logger.info("Read C from Q1");
- expect("C", consumer1.receive(1000));
-
- // commit
- _logger.info("session commit");
- session.commit();
- _logger.info("Start test Connection");
- testCon.start();
-
- // ensure sent messages can be received and received messages are gone
- _logger.info("Read X from Q2");
- expect("X", testConsumer2.receive(1000));
- _logger.info("Read Y from Q2");
- expect("Y", testConsumer2.receive(1000));
- _logger.info("Read Z from Q2");
- expect("Z", testConsumer2.receive(1000));
-
- _logger.info("create test session on Q1");
- testConsumer1 = testSession.createConsumer(queue1);
- _logger.info("Read null from Q1");
- assertTrue(null == testConsumer1.receive(1000));
- _logger.info("Read null from Q2");
- assertTrue(null == testConsumer2.receive(1000));
+ try
+ {
+// add some messages
+ _logger.info("Send prep A");
+ prepProducer1.send(prepSession.createTextMessage("A"));
+ _logger.info("Send prep B");
+ prepProducer1.send(prepSession.createTextMessage("B"));
+ _logger.info("Send prep C");
+ prepProducer1.send(prepSession.createTextMessage("C"));
+
+ // send and receive some messages
+ _logger.info("Send X to Q2");
+ producer2.send(session.createTextMessage("X"));
+ _logger.info("Send Y to Q2");
+ producer2.send(session.createTextMessage("Y"));
+ _logger.info("Send Z to Q2");
+ producer2.send(session.createTextMessage("Z"));
+
+ _logger.info("Read A from Q1");
+ expect("A", consumer1.receive(1000));
+ _logger.info("Read B from Q1");
+ expect("B", consumer1.receive(1000));
+ _logger.info("Read C from Q1");
+ expect("C", consumer1.receive(1000));
+
+ // commit
+ _logger.info("session commit");
+ session.commit();
+ _logger.info("Start test Connection");
+ testCon.start();
+
+ // ensure sent messages can be received and received messages are gone
+ _logger.info("Read X from Q2");
+ expect("X", testConsumer2.receive(1000));
+ _logger.info("Read Y from Q2");
+ expect("Y", testConsumer2.receive(1000));
+ _logger.info("Read Z from Q2");
+ expect("Z", testConsumer2.receive(1000));
+
+ _logger.info("create test session on Q1");
+ testConsumer1 = testSession.createConsumer(queue1);
+ _logger.info("Read null from Q1");
+ assertTrue(null == testConsumer1.receive(1000));
+ _logger.info("Read null from Q2");
+ assertTrue(null == testConsumer2.receive(1000));
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
public void testRollback() throws Exception
{
- // add some messages
- _logger.info("Send prep RB_A");
- prepProducer1.send(prepSession.createTextMessage("RB_A"));
- _logger.info("Send prep RB_B");
- prepProducer1.send(prepSession.createTextMessage("RB_B"));
- _logger.info("Send prep RB_C");
- prepProducer1.send(prepSession.createTextMessage("RB_C"));
-
- _logger.info("Sending RB_X RB_Y RB_Z");
- producer2.send(session.createTextMessage("RB_X"));
- producer2.send(session.createTextMessage("RB_Y"));
- producer2.send(session.createTextMessage("RB_Z"));
- _logger.info("Receiving RB_A RB_B");
- expect("RB_A", consumer1.receive(1000));
- expect("RB_B", consumer1.receive(1000));
- // Don't consume 'RB_C' leave it in the prefetch cache to ensure rollback removes it.
- // Quick sleep to ensure 'RB_C' gets pre-fetched
- Thread.sleep(500);
-
- // rollback
- _logger.info("rollback");
- session.rollback();
-
- _logger.info("Receiving RB_A RB_B RB_C");
- // ensure sent messages are not visible and received messages are requeued
- expect("RB_A", consumer1.receive(1000), true);
- expect("RB_B", consumer1.receive(1000), true);
- expect("RB_C", consumer1.receive(1000), true);
-
- _logger.info("Starting new connection");
- testCon.start();
- testConsumer1 = testSession.createConsumer(queue1);
- _logger.info("Testing we have no messages left");
- assertTrue(null == testConsumer1.receive(1000));
- assertTrue(null == testConsumer2.receive(1000));
-
- session.commit();
-
- _logger.info("Testing we have no messages left after commit");
- assertTrue(null == testConsumer1.receive(1000));
- assertTrue(null == testConsumer2.receive(1000));
+ try
+ {
+// add some messages
+ _logger.info("Send prep RB_A");
+ prepProducer1.send(prepSession.createTextMessage("RB_A"));
+ _logger.info("Send prep RB_B");
+ prepProducer1.send(prepSession.createTextMessage("RB_B"));
+ _logger.info("Send prep RB_C");
+ prepProducer1.send(prepSession.createTextMessage("RB_C"));
+
+ _logger.info("Sending RB_X RB_Y RB_Z");
+ producer2.send(session.createTextMessage("RB_X"));
+ producer2.send(session.createTextMessage("RB_Y"));
+ producer2.send(session.createTextMessage("RB_Z"));
+ _logger.info("Receiving RB_A RB_B");
+ expect("RB_A", consumer1.receive(1000));
+ expect("RB_B", consumer1.receive(1000));
+ // Don't consume 'RB_C' leave it in the prefetch cache to ensure rollback removes it.
+ // Quick sleep to ensure 'RB_C' gets pre-fetched
+ Thread.sleep(500);
+
+ // rollback
+ _logger.info("rollback");
+ session.rollback();
+
+ _logger.info("Receiving RB_A RB_B RB_C");
+ // ensure sent messages are not visible and received messages are requeued
+ expect("RB_A", consumer1.receive(1000), true);
+ expect("RB_B", consumer1.receive(1000), true);
+ expect("RB_C", consumer1.receive(1000), true);
+
+ _logger.info("Starting new connection");
+ testCon.start();
+ testConsumer1 = testSession.createConsumer(queue1);
+ _logger.info("Testing we have no messages left");
+ assertTrue(null == testConsumer1.receive(1000));
+ assertTrue(null == testConsumer2.receive(1000));
+
+ session.commit();
+
+ _logger.info("Testing we have no messages left after commit");
+ assertTrue(null == testConsumer1.receive(1000));
+ assertTrue(null == testConsumer2.receive(1000));
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
public void testResendsMsgsAfterSessionClose() throws Exception
{
- AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
+ try
+ {
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
- Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
- AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false);
- MessageConsumer consumer = consumerSession.createConsumer(queue3);
+ Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
+ AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false);
+ MessageConsumer consumer = consumerSession.createConsumer(queue3);
- AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
- Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED);
- MessageProducer producer = producerSession.createProducer(queue3);
+ AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
+ Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = producerSession.createProducer(queue3);
- _logger.info("Sending four messages");
- producer.send(producerSession.createTextMessage("msg1"));
- producer.send(producerSession.createTextMessage("msg2"));
- producer.send(producerSession.createTextMessage("msg3"));
- producer.send(producerSession.createTextMessage("msg4"));
+ _logger.info("Sending four messages");
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer.send(producerSession.createTextMessage("msg2"));
+ producer.send(producerSession.createTextMessage("msg3"));
+ producer.send(producerSession.createTextMessage("msg4"));
- producerSession.commit();
+ producerSession.commit();
- _logger.info("Starting connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive();
- assertNotNull(tm);
- assertEquals("msg1", tm.getText());
+ _logger.info("Starting connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive();
+ assertNotNull(tm);
+ assertEquals("msg1", tm.getText());
- consumerSession.commit();
+ consumerSession.commit();
- _logger.info("Received and committed first message");
- tm = (TextMessage) consumer.receive(1000);
- assertNotNull(tm);
- assertEquals("msg2", tm.getText());
+ _logger.info("Received and committed first message");
+ tm = (TextMessage) consumer.receive(1000);
+ assertNotNull(tm);
+ assertEquals("msg2", tm.getText());
- tm = (TextMessage) consumer.receive(1000);
- assertNotNull(tm);
- assertEquals("msg3", tm.getText());
+ tm = (TextMessage) consumer.receive(1000);
+ assertNotNull(tm);
+ assertEquals("msg3", tm.getText());
- tm = (TextMessage) consumer.receive(1000);
- assertNotNull(tm);
- assertEquals("msg4", tm.getText());
+ tm = (TextMessage) consumer.receive(1000);
+ assertNotNull(tm);
+ assertEquals("msg4", tm.getText());
- _logger.info("Received all four messages. Closing connection with three outstanding messages");
+ _logger.info("Received all four messages. Closing connection with three outstanding messages");
- consumerSession.close();
+ consumerSession.close();
- consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
+ consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
- consumer = consumerSession.createConsumer(queue3);
+ consumer = consumerSession.createConsumer(queue3);
- // no ack for last three messages so when I call recover I expect to get three messages back
- tm = (TextMessage) consumer.receive(3000);
- assertNotNull(tm);
- assertEquals("msg2", tm.getText());
- assertTrue("Message is not redelivered", tm.getJMSRedelivered());
+ // no ack for last three messages so when I call recover I expect to get three messages back
+ tm = (TextMessage) consumer.receive(3000);
+ assertNotNull(tm);
+ assertEquals("msg2", tm.getText());
+ assertTrue("Message is not redelivered", tm.getJMSRedelivered());
- tm = (TextMessage) consumer.receive(3000);
- assertNotNull(tm);
- assertEquals("msg3", tm.getText());
- assertTrue("Message is not redelivered", tm.getJMSRedelivered());
+ tm = (TextMessage) consumer.receive(3000);
+ assertNotNull(tm);
+ assertEquals("msg3", tm.getText());
+ assertTrue("Message is not redelivered", tm.getJMSRedelivered());
- tm = (TextMessage) consumer.receive(3000);
- assertNotNull(tm);
- assertEquals("msg4", tm.getText());
- assertTrue("Message is not redelivered", tm.getJMSRedelivered());
+ tm = (TextMessage) consumer.receive(3000);
+ assertNotNull(tm);
+ assertEquals("msg4", tm.getText());
+ assertTrue("Message is not redelivered", tm.getJMSRedelivered());
- _logger.info("Received redelivery of three messages. Committing");
+ _logger.info("Received redelivery of three messages. Committing");
- consumerSession.commit();
+ consumerSession.commit();
- _logger.info("Called commit");
+ _logger.info("Called commit");
- tm = (TextMessage) consumer.receive(1000);
- assertNull(tm);
+ tm = (TextMessage) consumer.receive(1000);
+ assertNull(tm);
- _logger.info("No messages redelivered as is expected");
+ _logger.info("No messages redelivered as is expected");
- con.close();
- con2.close();
+ con.close();
+ con2.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
private void expect(String text, Message msg) throws JMSException
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java?rev=580389&r1=580388&r2=580389&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java Fri Sep 28 08:40:21 2007
@@ -38,6 +38,7 @@
*/
public class QpidTestCase extends TestCase
{
+
/* this clas logger */
private static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class);
@@ -46,6 +47,7 @@
private static final String BROKER_PATH = "broker_path";
private static final String BROKER_PARAM = "broker_param";
private static final String BROKER_VM = "vm";
+ private static final String EXT_BROKER = "ext" ;
/**
* The process where the remote broker is running.
*/
@@ -79,18 +81,18 @@
{
_brokerParams = System.getProperties().getProperty(BROKER_PARAM);
}
- if (!_shel.equals(BROKER_VM))
+ if (!_shel.equals(BROKER_VM) && ! _shel.equals(EXT_BROKER) )
{
// start a new broker
startBroker();
}
- else
+ else if ( ! _shel.equals(EXT_BROKER) )
{
// create an in_VM broker
TransportConnection.createVMBroker(1);
}
- System.out.println("=========================================");
- System.out.println("= " + _shel + " " + _brokerPath + " " + _brokerParams);
+ _logger.info("=========================================");
+ _logger.info("= " + _shel + " " + _brokerPath + " " + _brokerParams);
}
/**
@@ -100,17 +102,18 @@
*/
protected void tearDown() throws Exception
{
- super.tearDown();
_logger.info("Kill broker");
if (_brokerProcess != null)
{
// destroy the currently running broker
_brokerProcess.destroy();
+ _brokerProcess = null;
}
else
{
TransportConnection.killAllVMBrokers();
}
+ super.tearDown();
}
//--------- Util method
@@ -130,6 +133,11 @@
{
//bad, we had an error starting the broker
throw new Exception("Problem when starting the broker: " + reader.readLine());
+ }
+ // We need to wait for th ebroker to start ideally we would need to ping it
+ synchronized(this)
+ {
+ this.wait(1000);
}
}