You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/16 11:51:05 UTC
svn commit: r496658 - in /incubator/qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/AMQChannel.java
client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
Author: rgreig
Date: Tue Jan 16 02:51:04 2007
New Revision: 496658
URL: http://svn.apache.org/viewvc?view=rev&rev=496658
Log: (empty)
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=496658&r1=496657&r2=496658
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Jan 16 02:51:04 2007
@@ -311,6 +311,7 @@
_txnContext.rollback();
unsubscribeAllConsumers(session);
requeue();
+ _txnContext.commit();
}
private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java?view=diff&rev=496658&r1=496657&r2=496658
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java Tue Jan 16 02:51:04 2007
@@ -23,7 +23,12 @@
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.mina.util.SessionLog;
+import org.apache.log4j.Logger;
import javax.jms.*;
@@ -47,10 +52,12 @@
private Session testSession;
private MessageConsumer testConsumer1;
private MessageConsumer testConsumer2;
+ private static final Logger _logger = Logger.getLogger(TransactedTest.class);
protected void setUp() throws Exception
{
super.setUp();
+ TransportConnection.createVMBroker(1);
queue1 = new AMQQueue("Q1", false);
queue2 = new AMQQueue("Q2", false);
@@ -86,6 +93,7 @@
con.close();
testCon.close();
prepCon.close();
+ TransportConnection.killAllVMBrokers();
super.tearDown();
}
@@ -132,6 +140,82 @@
assertTrue(null == testConsumer2.receive(1000));
}
+ public void testResendsMsgsAfterSessionClose() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ AMQQueue queue3 = new AMQQueue("Q3", false);
+ MessageConsumer consumer = consumerSession.createConsumer(queue3);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue3);
+
+ _logger.info("Sending four messages");
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer.send(producerSession.createTextMessage("msg2"));
+ producer.send(producerSession.createTextMessage("msg3"));
+ producer.send(producerSession.createTextMessage("msg4"));
+
+ producerSession.commit();
+
+
+ _logger.info("Starting connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive();
+
+ tm.acknowledge();
+ consumerSession.commit();
+
+ _logger.info("Received and acknowledged first message");
+ tm = (TextMessage) consumer.receive(1000);
+ assertNotNull(tm);
+ tm = (TextMessage) consumer.receive(1000);
+ assertNotNull(tm);
+ tm = (TextMessage) consumer.receive(1000);
+ assertNotNull(tm);
+ _logger.info("Received all four messages. Closing connection with three outstanding messages");
+
+ consumerSession.close();
+
+ consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+
+ consumer = consumerSession.createConsumer(queue3);
+
+ // no ack for last three messages so when I call recover I expect to get three messages back
+
+ tm = (TextMessage) consumer.receive(3000);
+ assertNotNull(tm);
+ assertEquals("msg2", tm.getText());
+
+ tm = (TextMessage) consumer.receive(3000);
+ assertNotNull(tm);
+ assertEquals("msg3", tm.getText());
+
+ tm = (TextMessage) consumer.receive(3000);
+ assertNotNull(tm);
+ assertEquals("msg4", tm.getText());
+
+ _logger.info("Received redelivery of three messages. Acknowledging last message");
+ tm.acknowledge();
+ consumerSession.commit();
+ _logger.info("Calling acknowledge with no outstanding messages");
+ // all acked so no messages to be delivered
+
+
+ tm = (TextMessage) consumer.receiveNoWait();
+ assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+
+ con.close();
+ con2.close();
+
+ }
+
+
private void expect(String text, Message msg) throws JMSException
{
assertTrue(msg instanceof TextMessage);
@@ -140,6 +224,6 @@
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(TransactedTest.class));
+ return new junit.framework.TestSuite(TransactedTest.class);
}
}