You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2007/09/14 22:39:05 UTC
svn commit: r575788 - in /incubator/qpid/branches/M2.1/java/client/src:
main/java/org/apache/qpid/client/AMQSession.java
test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
Author: rhs
Date: Fri Sep 14 13:39:05 2007
New Revision: 575788
URL: http://svn.apache.org/viewvc?rev=575788&view=rev
Log:
Merged revision 572751 from the trunk, this fixes QPID-573.
Modified:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=575788&r1=575787&r2=575788&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Sep 14 13:39:05 2007
@@ -108,6 +108,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
/**
*
@@ -220,6 +221,11 @@
*/
private final FlowControllingBlockingQueue _queue;
+ /**
+ * Holds the highest received delivery tag.
+ */
+ private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+
/** Holds the dispatcher thread for this session. */
private Dispatcher _dispatcher;
@@ -1278,6 +1284,7 @@
}
else
{
+ _highestDeliveryTag.set(message.getDeliverBody().deliveryTag);
_queue.add(message);
}
}
@@ -2558,6 +2565,7 @@
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final Object _lock = new Object();
+ private final AtomicLong _rollbackMark = new AtomicLong(-1);
public Dispatcher()
{
@@ -2614,7 +2622,7 @@
setConnectionStopped(true);
}
- rejectAllMessages(true);
+ _rollbackMark.set(_highestDeliveryTag.get());
_dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
@@ -2650,7 +2658,7 @@
// Allow disptacher to start stopped
synchronized (_lock)
{
- while (connectionStopped())
+ while (!_closed.get() && connectionStopped())
{
try
{
@@ -2675,14 +2683,16 @@
_lock.wait();
}
- synchronized (_messageDeliveryLock)
+ if (message.getDeliverBody().deliveryTag <= _rollbackMark.get())
{
- dispatchMessage(message);
+ rejectMessage(message, true);
}
-
- while (connectionStopped())
+ else
{
- _lock.wait();
+ synchronized (_messageDeliveryLock)
+ {
+ dispatchMessage(message);
+ }
}
}
Modified: incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java?rev=575788&r1=575787&r2=575788&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java Fri Sep 14 13:39:05 2007
@@ -171,34 +171,33 @@
public void testRollback() throws Exception
{
// add some messages
- _logger.info("Send prep A");
- prepProducer1.send(prepSession.createTextMessage("A"));
- _logger.info("Send prep B");
- prepProducer1.send(prepSession.createTextMessage("B"));
- _logger.info("Send prep C");
- prepProducer1.send(prepSession.createTextMessage("C"));
+ _logger.info("Send prep RB_A");
+ prepProducer1.send(prepSession.createTextMessage("RB_A"));
+ _logger.info("Send prep RB_B");
+ prepProducer1.send(prepSession.createTextMessage("RB_B"));
+ _logger.info("Send prep RB_C");
+ prepProducer1.send(prepSession.createTextMessage("RB_C"));
- // Quick sleep to ensure all three get pre-fetched
+ _logger.info("Sending RB_X RB_Y RB_Z");
+ producer2.send(session.createTextMessage("RB_X"));
+ producer2.send(session.createTextMessage("RB_Y"));
+ producer2.send(session.createTextMessage("RB_Z"));
+ _logger.info("Receiving RB_A RB_B");
+ expect("RB_A", consumer1.receive(1000));
+ expect("RB_B", consumer1.receive(1000));
+ // Don't consume 'RB_C' leave it in the prefetch cache to ensure rollback removes it.
+ // Quick sleep to ensure 'RB_C' gets pre-fetched
Thread.sleep(500);
- _logger.info("Sending X Y Z");
- producer2.send(session.createTextMessage("X"));
- producer2.send(session.createTextMessage("Y"));
- producer2.send(session.createTextMessage("Z"));
- _logger.info("Receiving A B");
- expect("A", consumer1.receive(1000));
- expect("B", consumer1.receive(1000));
- // Don't consume 'C' leave it in the prefetch cache to ensure rollback removes it.
-
// rollback
_logger.info("rollback");
session.rollback();
- _logger.info("Receiving A B C");
+ _logger.info("Receiving RB_A RB_B RB_C");
// ensure sent messages are not visible and received messages are requeued
- expect("A", consumer1.receive(1000), true);
- expect("B", consumer1.receive(1000), true);
- expect("C", consumer1.receive(1000), true);
+ expect("RB_A", consumer1.receive(1000), true);
+ expect("RB_B", consumer1.receive(1000), true);
+ expect("RB_C", consumer1.receive(1000), true);
_logger.info("Starting new connection");
testCon.start();