You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/05/11 21:51:32 UTC

svn commit: r943249 - in /qpid/branches/0.5.x-dev/qpid/java: broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java

Author: ritchiem
Date: Tue May 11 19:51:32 2010
New Revision: 943249

URL: http://svn.apache.org/viewvc?rev=943249&view=rev
Log:
QPID-2596 : Updated QEI to restoreCredit when aquired messages are released. Updated CommitRollbackTest.

Modified:
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=943249&r1=943248&r2=943249&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue May 11 19:51:32 2010
@@ -178,6 +178,11 @@ public class QueueEntryImpl implements Q
 
     public void release()
     {
+        Subscription subscription = getDeliveredSubscription();
+        if (subscription != null)
+        {
+            subscription.restoreCredit(this);
+        }
         _stateUpdater.set(this,AVAILABLE_STATE);
     }
 

Modified: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=943249&r1=943248&r2=943249&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Tue May 11 19:51:32 2010
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.test.unit.transacted;
 
+import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.client.AMQConnection;
 import org.slf4j.Logger;
@@ -63,12 +64,12 @@ public class CommitRollbackTest extends 
     {
         conn = (AMQConnection) getConnection("guest", "guest");
 
-        _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        _session = conn.createSession(true, Session.SESSION_TRANSACTED);
 
         _jmsQueue = _session.createQueue(queue);
         _consumer = _session.createConsumer(_jmsQueue);
 
-        _pubSession = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        _pubSession = conn.createSession(true, Session.SESSION_TRANSACTED);
 
         _publisher = _pubSession.createProducer(_pubSession.createQueue(queue));
 
@@ -500,6 +501,86 @@ public class CommitRollbackTest extends 
     }
 
     /**
+     * QPID-2596
+     *  Test that rollback works.
+     *
+     * Goal is to ensure that message credit is correctly restored.
+     * Previously rollback would result in message release() on the Java broker
+     * which would leak credit.
+     *
+     * Here we set a small pre-fetch and so small credit window and then consume
+     * a large number of messages.
+     *
+     * By filling the pre-fetch before rolling back we ensure that all the
+     * credit is used and so if we do not get any back the test will fail
+     * consistently.
+     *
+     * Using a large pre-fetch we can guarantee that we have filled the pre-fetch
+     * before performing rollback.
+     *
+     * Test outline.
+     *
+     *  - Connect two transacted sessions with a small pre-fetch.
+     *  - Send a large amount of messages on one session
+     *  - Use second session to receive pre-fetch worth of messages
+     *  - Rollback receiver session
+     *  - Continue to consume all the messages on the receiver session,
+     *     committing every batch size.
+     *  - Fail if we can't get the message.
+     *  - End by checking all msgs are consumed
+     *
+     * @throws Exception uf some thing unexpected occured
+     */
+    public void testReceiveThenRollbackConsumerThenReceive() throws Exception
+    {
+        // Close connection so we can reset the pre-fetch
+        conn.close();
+
+        int MAX_PREFETCH=5;
+        int BACK_LOG_FACTOR=200;
+
+        setSystemProperty("max_prefetch", String.valueOf(MAX_PREFETCH));
+
+        // Reconnect
+        newConnection();
+
+        assertEquals("Prefetch not reset",
+                     MAX_PREFETCH,((AMQSession)_session).getDefaultPrefetch());
+
+        assertTrue("session is not transacted", _session.getTransacted());
+        assertTrue("session is not transacted", _pubSession.getTransacted());
+
+        _logger.info("Sending (" + MAX_PREFETCH * BACK_LOG_FACTOR + ")messages");
+        sendMessage(_pubSession, _publisher.getDestination(), MAX_PREFETCH * BACK_LOG_FACTOR);
+        _pubSession.commit();
+
+        for (int i=0 ;i< MAX_PREFETCH; i++)
+        {
+            assertNotNull("Received:" + i, _consumer.receive(1000));
+            _logger.info("Received:"+i);
+        }
+
+
+        _logger.info("Rolling back");
+        _session.rollback();
+
+        _logger.info("Receiving messages");
+
+        for (int b = 0; b < BACK_LOG_FACTOR; b++)
+        {
+            for (int a = 0; a < MAX_PREFETCH; a++)
+            {
+                assertNotNull("Received (" + b + ")after rollback:" + a, _consumer.receive(1000));
+            }
+            _session.commit();
+        }
+
+        Message result = _consumer.receive(500);
+        assertNull("test message was put and rolled back, but is still present", result);
+    }
+
+
+    /**
      * Qpid-1163
      * Check that when commt is called inside onMessage then
      * the last message is nor redelivered. 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org