You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/03/15 16:39:40 UTC
svn commit: r518667 - in /incubator/qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/ack/
broker/src/main/java/org/apache/qpid/server/queue/
client/src/test/java/org/apache/qpid/test/unit/ba...
Author: rgreig
Date: Thu Mar 15 08:39:39 2007
New Revision: 518667
URL: http://svn.apache.org/viewvc?view=rev&rev=518667
Log:
Short pause to help ensure connection.close comes after last message ack, added to PropertyValueTest
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java?view=diff&rev=518667&r1=518666&r2=518667
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java Thu Mar 15 08:39:39 2007
@@ -36,12 +36,14 @@
public RequiredDeliveryException(String message, AMQMessage payload)
{
super(message);
- _amqMessage = payload;
+
// Increment the reference as this message is in the routing phase
// and so will have the ref decremented as routing fails.
// we need to keep this message around so we can return it in the
- // handler. So increment here.
- payload.incrementReference();
+ // handler. So increment here.
+ _amqMessage = payload.takeReference();
+
+ //payload.incrementReference();
}
public AMQMessage getAMQMessage()
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?view=diff&rev=518667&r1=518666&r2=518667
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java Thu Mar 15 08:39:39 2007
@@ -116,7 +116,7 @@
for (UnacknowledgedMessage msg : _unacked)
{
msg.clearTransientMessageData();
- msg.message.incrementReference();
+ msg.message.takeReference();
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=518667&r1=518666&r2=518667
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Mar 15 08:39:39 2007
@@ -355,15 +355,22 @@
return _messageId;
}
+ /**
+ * Creates a long-lived reference to this message, and increments the count of such references, as an atomic operation.
+ */
+ public AMQMessage takeReference()
+ {
+ _referenceCount.incrementAndGet();
+ return this;
+ }
+
/** Threadsafe. Increment the reference count on the message. */
- public void incrementReference()
+ protected void incrementReference()
{
_referenceCount.incrementAndGet();
if (_log.isDebugEnabled())
{
-
_log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
-
}
}
@@ -377,11 +384,13 @@
*/
public void decrementReference(StoreContext storeContext) throws MessageCleanupException
{
+ int count = _referenceCount.decrementAndGet();
+
// note that the operation of decrementing the reference count and then removing the message does not
// have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
// the message has been passed to all queues. i.e. we are
// not relying on the all the increments having taken place before the delivery manager decrements.
- if (_referenceCount.decrementAndGet() == 0)
+ if (count == 0)
{
try
{
@@ -408,13 +417,13 @@
{
if (_log.isDebugEnabled())
{
- _log.debug("Decremented ref count is now " + _referenceCount + " for message id " + debugIdentity() + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 5));
- if (_referenceCount.get() < 0)
+ _log.debug("Decremented ref count is now " + count + " for message id " + debugIdentity() + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 5));
+ if (count < 0)
{
Thread.dumpStack();
}
}
- if (_referenceCount.get() < 0)
+ if (count < 0)
{
throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.");
}
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java?view=diff&rev=518667&r1=518666&r2=518667
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java Thu Mar 15 08:39:39 2007
@@ -117,6 +117,8 @@
waitFor(count);
check();
_logger.info("Completed without failure");
+
+ Thread.sleep(10);
_connection.close();
_logger.error("End Run Number:" + (run - 1));
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java?view=diff&rev=518667&r1=518666&r2=518667
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java Thu Mar 15 08:39:39 2007
@@ -80,7 +80,8 @@
AMQMessage message = new AMQMessage(_store.getNewMessageId(), info,
new NonTransactionalContext(_store, _storeContext, null, null, null),
createPersistentContentHeader());
- message.incrementReference();
+ message = message.takeReference();
+
// we call routing complete to set up the handle
message.routingComplete(_store, _storeContext, new MessageHandleFactory());
assertTrue(_store.getMessageMetaDataMap().size() == 1);
@@ -128,11 +129,12 @@
info,
new NonTransactionalContext(_store, _storeContext, null, null, null),
createPersistentContentHeader());
- message.incrementReference();
+
+ message = message.takeReference();
// we call routing complete to set up the handle
message.routingComplete(_store, _storeContext, new MessageHandleFactory());
assertTrue(_store.getMessageMetaDataMap().size() == 1);
- message.incrementReference();
+ message = message.takeReference();
message.decrementReference(_storeContext);
assertTrue(_store.getMessageMetaDataMap().size() == 1);
}