You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/10/06 18:41:01 UTC

svn commit: r1179699 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/AMQSession.java systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java

Author: robbie
Date: Thu Oct  6 16:41:01 2011
New Revision: 1179699

URL: http://svn.apache.org/viewvc?rev=1179699&view=rev
Log:
QPID-3525: stop the client from accidentally consuming the messages it is actually trying to recover when using 0-8/9/9-1. Also stops it acking the message from the in-progress onMessage delivery after Session.recover() was called.

Applied patch from Oleksandr Rudyy<or...@gmail.com> and myself.

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1179699&r1=1179698&r2=1179699&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Oct  6 16:41:01 2011
@@ -362,7 +362,13 @@ public abstract class AMQSession<C exten
      * Set when recover is called. This is to handle the case where recover() is called by application code during
      * onMessage() processing to ensure that an auto ack is not sent.
      */
-    private boolean _inRecovery;
+    private volatile boolean _sessionInRecovery;
+
+    /**
+     * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
+     * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
+     */
+    private volatile boolean _usingDispatcherForCleanup;
 
     /** Used to indicates that the connection to which this session belongs, has been stopped. */
     private boolean _connectionStopped;
@@ -1703,8 +1709,8 @@ public abstract class AMQSession<C exten
             // flush any acks we are holding in the buffer.
             flushAcknowledgments();
 
-            // this is set only here, and the before the consumer's onMessage is called it is set to false
-            _inRecovery = true;
+            // this is only set true here, and only set false when the consumers preDeliver method is called
+            _sessionInRecovery = true;
 
             boolean isSuspended = isSuspended();
 
@@ -1712,9 +1718,18 @@ public abstract class AMQSession<C exten
             {
                 suspendChannel(true);
             }
-            
+
+            // Set to true to short circuit delivery of anything currently
+            //in the pre-dispatch queue.
+            _usingDispatcherForCleanup = true;
+
             syncDispatchQueue();
-            
+
+            // Set to false before sending the recover as 0-8/9/9-1 will
+            //send messages back before the recover completes, and we
+            //probably shouldn't clean those! ;-)
+            _usingDispatcherForCleanup = false;
+
             if (_dispatcher != null)
             {
                 _dispatcher.recover();
@@ -1723,10 +1738,7 @@ public abstract class AMQSession<C exten
             sendRecover();
             
             markClean();
-            
-            // Set inRecovery to false before you start message flow again again.            
-            _inRecovery = false; 
-            
+
             if (!isSuspended)
             {
                 suspendChannel(false);
@@ -2126,7 +2138,7 @@ public abstract class AMQSession<C exten
 
     boolean isInRecovery()
     {
-        return _inRecovery;
+        return _sessionInRecovery;
     }
 
     boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException
@@ -2248,7 +2260,7 @@ public abstract class AMQSession<C exten
 
     void setInRecovery(boolean inRecovery)
     {
-        _inRecovery = inRecovery;
+        _sessionInRecovery = inRecovery;
     }
 
     boolean isStarted()
@@ -3325,7 +3337,7 @@ public abstract class AMQSession<C exten
                 {
                     rejectMessage(message, true);
                 }
-                else if (isInRecovery())
+                else if (_usingDispatcherForCleanup)
                 {
                     _unacknowledgedMessageTags.add(deliveryTag);            
                 }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java?rev=1179699&r1=1179698&r2=1179699&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java Thu Oct  6 16:41:01 2011
@@ -46,7 +46,7 @@ public class RecoverTest extends Failove
 {
     static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class);
 
-    private Exception _error;
+    private volatile Exception _error;
     private AtomicInteger count;
 
     protected AMQConnection _connection;
@@ -249,14 +249,13 @@ public class RecoverTest extends Failove
                     {
                         if (!message.getJMSRedelivered())
                         {
-                            setError(
-                                    new Exception("Message not marked as redelivered on what should be second delivery attempt"));
+                            setError(new Exception("Message not marked as redelivered on what should be second delivery attempt"));
                         }
                     }
                     else
                     {
-                        System.err.println(message);
-                        fail("Message delivered too many times!: " + count);
+                        _logger.error(message.toString());
+                        setError(new Exception("Message delivered too many times!: " + count));
                     }
                 }
                 catch (JMSException e)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org