You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/10/06 18:41:01 UTC
svn commit: r1179699 - in /qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/AMQSession.java
systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
Author: robbie
Date: Thu Oct 6 16:41:01 2011
New Revision: 1179699
URL: http://svn.apache.org/viewvc?rev=1179699&view=rev
Log:
QPID-3525: stop the client from accidentally consuming the messages it is actually trying to recover when using 0-8/9/9-1. Also stops it acking the message from the in-progress onMessage delivery after Session.recover() was called.
Applied patch from Oleksandr Rudyy<or...@gmail.com> and myself.
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1179699&r1=1179698&r2=1179699&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Oct 6 16:41:01 2011
@@ -362,7 +362,13 @@ public abstract class AMQSession<C exten
* Set when recover is called. This is to handle the case where recover() is called by application code during
* onMessage() processing to ensure that an auto ack is not sent.
*/
- private boolean _inRecovery;
+ private volatile boolean _sessionInRecovery;
+
+ /**
+ * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
+ * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
+ */
+ private volatile boolean _usingDispatcherForCleanup;
/** Used to indicates that the connection to which this session belongs, has been stopped. */
private boolean _connectionStopped;
@@ -1703,8 +1709,8 @@ public abstract class AMQSession<C exten
// flush any acks we are holding in the buffer.
flushAcknowledgments();
- // this is set only here, and the before the consumer's onMessage is called it is set to false
- _inRecovery = true;
+ // this is only set true here, and only set false when the consumers preDeliver method is called
+ _sessionInRecovery = true;
boolean isSuspended = isSuspended();
@@ -1712,9 +1718,18 @@ public abstract class AMQSession<C exten
{
suspendChannel(true);
}
-
+
+ // Set to true to short circuit delivery of anything currently
+ //in the pre-dispatch queue.
+ _usingDispatcherForCleanup = true;
+
syncDispatchQueue();
-
+
+ // Set to false before sending the recover as 0-8/9/9-1 will
+ //send messages back before the recover completes, and we
+ //probably shouldn't clean those! ;-)
+ _usingDispatcherForCleanup = false;
+
if (_dispatcher != null)
{
_dispatcher.recover();
@@ -1723,10 +1738,7 @@ public abstract class AMQSession<C exten
sendRecover();
markClean();
-
- // Set inRecovery to false before you start message flow again again.
- _inRecovery = false;
-
+
if (!isSuspended)
{
suspendChannel(false);
@@ -2126,7 +2138,7 @@ public abstract class AMQSession<C exten
boolean isInRecovery()
{
- return _inRecovery;
+ return _sessionInRecovery;
}
boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException
@@ -2248,7 +2260,7 @@ public abstract class AMQSession<C exten
void setInRecovery(boolean inRecovery)
{
- _inRecovery = inRecovery;
+ _sessionInRecovery = inRecovery;
}
boolean isStarted()
@@ -3325,7 +3337,7 @@ public abstract class AMQSession<C exten
{
rejectMessage(message, true);
}
- else if (isInRecovery())
+ else if (_usingDispatcherForCleanup)
{
_unacknowledgedMessageTags.add(deliveryTag);
}
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java?rev=1179699&r1=1179698&r2=1179699&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java Thu Oct 6 16:41:01 2011
@@ -46,7 +46,7 @@ public class RecoverTest extends Failove
{
static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class);
- private Exception _error;
+ private volatile Exception _error;
private AtomicInteger count;
protected AMQConnection _connection;
@@ -249,14 +249,13 @@ public class RecoverTest extends Failove
{
if (!message.getJMSRedelivered())
{
- setError(
- new Exception("Message not marked as redelivered on what should be second delivery attempt"));
+ setError(new Exception("Message not marked as redelivered on what should be second delivery attempt"));
}
}
else
{
- System.err.println(message);
- fail("Message delivered too many times!: " + count);
+ _logger.error(message.toString());
+ setError(new Exception("Message delivered too many times!: " + count));
}
}
catch (JMSException e)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org