You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/16 11:51:05 UTC

svn commit: r496658 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/AMQChannel.java client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java

Author: rgreig
Date: Tue Jan 16 02:51:04 2007
New Revision: 496658

URL: http://svn.apache.org/viewvc?view=rev&rev=496658
Log: (empty)

Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=496658&r1=496657&r2=496658
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Jan 16 02:51:04 2007
@@ -311,6 +311,7 @@
         _txnContext.rollback();
         unsubscribeAllConsumers(session);
         requeue();
+		_txnContext.commit();
     }
 
     private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java?view=diff&rev=496658&r1=496657&r2=496658
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java Tue Jan 16 02:51:04 2007
@@ -23,7 +23,12 @@
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.mina.util.SessionLog;
+import org.apache.log4j.Logger;
 
 import javax.jms.*;
 
@@ -47,10 +52,12 @@
     private Session testSession;
     private MessageConsumer testConsumer1;
     private MessageConsumer testConsumer2;
+    private static final Logger _logger = Logger.getLogger(TransactedTest.class);
 
     protected void setUp() throws Exception
     {
         super.setUp();
+        TransportConnection.createVMBroker(1);
         queue1 = new AMQQueue("Q1", false);
         queue2 = new AMQQueue("Q2", false);
 
@@ -86,6 +93,7 @@
         con.close();
         testCon.close();
         prepCon.close();
+        TransportConnection.killAllVMBrokers();
         super.tearDown();
     }
 
@@ -132,6 +140,82 @@
         assertTrue(null == testConsumer2.receive(1000));
     }
 
+    public void testResendsMsgsAfterSessionClose() throws Exception
+    {
+        Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+        Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        AMQQueue queue3 = new AMQQueue("Q3", false);
+        MessageConsumer consumer = consumerSession.createConsumer(queue3);
+        //force synch to ensure the consumer has resulted in a bound queue
+        ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+        Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(queue3);
+
+        _logger.info("Sending four messages");
+        producer.send(producerSession.createTextMessage("msg1"));
+        producer.send(producerSession.createTextMessage("msg2"));
+        producer.send(producerSession.createTextMessage("msg3"));
+        producer.send(producerSession.createTextMessage("msg4"));
+
+        producerSession.commit();
+
+
+        _logger.info("Starting connection");
+        con.start();
+        TextMessage tm = (TextMessage) consumer.receive();
+
+        tm.acknowledge();
+        consumerSession.commit();
+
+        _logger.info("Received and acknowledged first message");
+        tm = (TextMessage) consumer.receive(1000);
+        assertNotNull(tm);
+        tm = (TextMessage) consumer.receive(1000);
+        assertNotNull(tm);
+        tm = (TextMessage) consumer.receive(1000);
+        assertNotNull(tm);
+        _logger.info("Received all four messages. Closing connection with three outstanding messages");
+
+        consumerSession.close();
+
+        consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+
+        consumer = consumerSession.createConsumer(queue3);
+
+        // no ack for last three messages so when I call recover I expect to get three messages back
+
+        tm = (TextMessage) consumer.receive(3000);
+        assertNotNull(tm);
+        assertEquals("msg2", tm.getText());
+
+        tm = (TextMessage) consumer.receive(3000);
+        assertNotNull(tm);
+        assertEquals("msg3", tm.getText());
+
+        tm = (TextMessage) consumer.receive(3000);
+        assertNotNull(tm);
+        assertEquals("msg4", tm.getText());
+
+        _logger.info("Received redelivery of three messages. Acknowledging last message");
+        tm.acknowledge();
+        consumerSession.commit();
+        _logger.info("Calling acknowledge with no outstanding messages");
+        // all acked so no messages to be delivered
+
+
+        tm = (TextMessage) consumer.receiveNoWait();
+        assertNull(tm);
+        _logger.info("No messages redelivered as is expected");
+
+        con.close();
+        con2.close();
+
+    }
+
+
     private void expect(String text, Message msg) throws JMSException
     {
         assertTrue(msg instanceof TextMessage);
@@ -140,6 +224,6 @@
 
     public static junit.framework.Test suite()
     {
-        return new VMBrokerSetup(new junit.framework.TestSuite(TransactedTest.class));
+        return new junit.framework.TestSuite(TransactedTest.class);
     }
 }