You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2007/09/04 19:42:38 UTC

svn commit: r572751 - in /incubator/qpid/trunk/qpid/java/client/src: main/java/org/apache/qpid/client/AMQSession.java test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java

Author: rhs
Date: Tue Sep  4 10:42:38 2007
New Revision: 572751

URL: http://svn.apache.org/viewvc?rev=572751&view=rev
Log:
fixed a race condition in rollback() that leads to intermittant failures of TransactedTest, also modified TransactedTest to be slightly more robust

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=572751&r1=572750&r2=572751&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Sep  4 10:42:38 2007
@@ -109,6 +109,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  *
@@ -221,6 +222,11 @@
      */
     private final FlowControllingBlockingQueue _queue;
 
+    /**
+     * Holds the highest received delivery tag.
+     */
+    private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+
     /** Holds the dispatcher thread for this session. */
     private Dispatcher _dispatcher;
 
@@ -1281,6 +1287,7 @@
         }
         else
         {
+            _highestDeliveryTag.set(message.getDeliverBody().deliveryTag);
             _queue.add(message);
         }
     }
@@ -2553,6 +2560,7 @@
         private final AtomicBoolean _closed = new AtomicBoolean(false);
 
         private final Object _lock = new Object();
+        private final AtomicLong _rollbackMark = new AtomicLong(-1);
 
         public Dispatcher()
         {
@@ -2609,7 +2617,7 @@
                     setConnectionStopped(true);
                 }
 
-                rejectAllMessages(true);
+                _rollbackMark.set(_highestDeliveryTag.get());
 
                 _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
 
@@ -2645,7 +2653,7 @@
             // Allow disptacher to start stopped
             synchronized (_lock)
             {
-                while (connectionStopped())
+                while (!_closed.get() && connectionStopped())
                 {
                     try
                     {
@@ -2670,14 +2678,16 @@
                             _lock.wait();
                         }
 
-                        synchronized (_messageDeliveryLock)
+                        if (message.getDeliverBody().deliveryTag <= _rollbackMark.get())
                         {
-                            dispatchMessage(message);
+                            rejectMessage(message, true);
                         }
-
-                        while (connectionStopped())
+                        else
                         {
-                            _lock.wait();
+                            synchronized (_messageDeliveryLock)
+                            {
+                                dispatchMessage(message);
+                            }
                         }
 
                     }

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java?rev=572751&r1=572750&r2=572751&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java Tue Sep  4 10:42:38 2007
@@ -171,34 +171,33 @@
     public void testRollback() throws Exception
     {
         // add some messages
-        _logger.info("Send prep A");
-        prepProducer1.send(prepSession.createTextMessage("A"));
-        _logger.info("Send prep B");
-        prepProducer1.send(prepSession.createTextMessage("B"));
-        _logger.info("Send prep C");
-        prepProducer1.send(prepSession.createTextMessage("C"));
+        _logger.info("Send prep RB_A");
+        prepProducer1.send(prepSession.createTextMessage("RB_A"));
+        _logger.info("Send prep RB_B");
+        prepProducer1.send(prepSession.createTextMessage("RB_B"));
+        _logger.info("Send prep RB_C");
+        prepProducer1.send(prepSession.createTextMessage("RB_C"));
 
-        // Quick sleep to ensure all three get pre-fetched
+        _logger.info("Sending RB_X RB_Y RB_Z");
+        producer2.send(session.createTextMessage("RB_X"));
+        producer2.send(session.createTextMessage("RB_Y"));
+        producer2.send(session.createTextMessage("RB_Z"));
+        _logger.info("Receiving RB_A RB_B");
+        expect("RB_A", consumer1.receive(1000));
+        expect("RB_B", consumer1.receive(1000));
+        // Don't consume 'RB_C' leave it in the prefetch cache to ensure rollback removes it.
+        // Quick sleep to ensure 'RB_C' gets pre-fetched
         Thread.sleep(500);
 
-        _logger.info("Sending X Y Z");
-        producer2.send(session.createTextMessage("X"));
-        producer2.send(session.createTextMessage("Y"));
-        producer2.send(session.createTextMessage("Z"));
-        _logger.info("Receiving A B");
-        expect("A", consumer1.receive(1000));
-        expect("B", consumer1.receive(1000));
-        // Don't consume 'C' leave it in the prefetch cache to ensure rollback removes it.
-
         // rollback
         _logger.info("rollback");
         session.rollback();
 
-        _logger.info("Receiving A B C");
+        _logger.info("Receiving RB_A RB_B RB_C");
         // ensure sent messages are not visible and received messages are requeued
-        expect("A", consumer1.receive(1000), true);
-        expect("B", consumer1.receive(1000), true);
-        expect("C", consumer1.receive(1000), true);
+        expect("RB_A", consumer1.receive(1000), true);
+        expect("RB_B", consumer1.receive(1000), true);
+        expect("RB_C", consumer1.receive(1000), true);
 
         _logger.info("Starting new connection");
         testCon.start();