You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/05/11 21:51:32 UTC
svn commit: r943249 - in /qpid/branches/0.5.x-dev/qpid/java:
broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
Author: ritchiem
Date: Tue May 11 19:51:32 2010
New Revision: 943249
URL: http://svn.apache.org/viewvc?rev=943249&view=rev
Log:
QPID-2596 : Updated QEI to restoreCredit when aquired messages are released. Updated CommitRollbackTest.
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=943249&r1=943248&r2=943249&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue May 11 19:51:32 2010
@@ -178,6 +178,11 @@ public class QueueEntryImpl implements Q
public void release()
{
+ Subscription subscription = getDeliveredSubscription();
+ if (subscription != null)
+ {
+ subscription.restoreCredit(this);
+ }
_stateUpdater.set(this,AVAILABLE_STATE);
}
Modified: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=943249&r1=943248&r2=943249&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Tue May 11 19:51:32 2010
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.test.unit.transacted;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.client.AMQConnection;
import org.slf4j.Logger;
@@ -63,12 +64,12 @@ public class CommitRollbackTest extends
{
conn = (AMQConnection) getConnection("guest", "guest");
- _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ _session = conn.createSession(true, Session.SESSION_TRANSACTED);
_jmsQueue = _session.createQueue(queue);
_consumer = _session.createConsumer(_jmsQueue);
- _pubSession = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ _pubSession = conn.createSession(true, Session.SESSION_TRANSACTED);
_publisher = _pubSession.createProducer(_pubSession.createQueue(queue));
@@ -500,6 +501,86 @@ public class CommitRollbackTest extends
}
/**
+ * QPID-2596
+ * Test that rollback works.
+ *
+ * Goal is to ensure that message credit is correctly restored.
+ * Previously rollback would result in message release() on the Java broker
+ * which would leak credit.
+ *
+ * Here we set a small pre-fetch and so small credit window and then consume
+ * a large number of messages.
+ *
+ * By filling the pre-fetch before rolling back we ensure that all the
+ * credit is used and so if we do not get any back the test will fail
+ * consistently.
+ *
+ * Using a large pre-fetch we can guarantee that we have filled the pre-fetch
+ * before performing rollback.
+ *
+ * Test outline.
+ *
+ * - Connect two transacted sessions with a small pre-fetch.
+ * - Send a large amount of messages on one session
+ * - Use second session to receive pre-fetch worth of messages
+ * - Rollback receiver session
+ * - Continue to consume all the messages on the receiver session,
+ * committing every batch size.
+ * - Fail if we can't get the message.
+ * - End by checking all msgs are consumed
+ *
+ * @throws Exception uf some thing unexpected occured
+ */
+ public void testReceiveThenRollbackConsumerThenReceive() throws Exception
+ {
+ // Close connection so we can reset the pre-fetch
+ conn.close();
+
+ int MAX_PREFETCH=5;
+ int BACK_LOG_FACTOR=200;
+
+ setSystemProperty("max_prefetch", String.valueOf(MAX_PREFETCH));
+
+ // Reconnect
+ newConnection();
+
+ assertEquals("Prefetch not reset",
+ MAX_PREFETCH,((AMQSession)_session).getDefaultPrefetch());
+
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("Sending (" + MAX_PREFETCH * BACK_LOG_FACTOR + ")messages");
+ sendMessage(_pubSession, _publisher.getDestination(), MAX_PREFETCH * BACK_LOG_FACTOR);
+ _pubSession.commit();
+
+ for (int i=0 ;i< MAX_PREFETCH; i++)
+ {
+ assertNotNull("Received:" + i, _consumer.receive(1000));
+ _logger.info("Received:"+i);
+ }
+
+
+ _logger.info("Rolling back");
+ _session.rollback();
+
+ _logger.info("Receiving messages");
+
+ for (int b = 0; b < BACK_LOG_FACTOR; b++)
+ {
+ for (int a = 0; a < MAX_PREFETCH; a++)
+ {
+ assertNotNull("Received (" + b + ")after rollback:" + a, _consumer.receive(1000));
+ }
+ _session.commit();
+ }
+
+ Message result = _consumer.receive(500);
+ assertNull("test message was put and rolled back, but is still present", result);
+ }
+
+
+ /**
* Qpid-1163
* Check that when commt is called inside onMessage then
* the last message is nor redelivered.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org