You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/03/15 16:39:40 UTC

svn commit: r518667 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/queue/ client/src/test/java/org/apache/qpid/test/unit/ba...

Author: rgreig
Date: Thu Mar 15 08:39:39 2007
New Revision: 518667

URL: http://svn.apache.org/viewvc?view=rev&rev=518667
Log:
Short pause to help ensure connection.close comes after last message ack, added to PropertyValueTest

Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java?view=diff&rev=518667&r1=518666&r2=518667
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java Thu Mar 15 08:39:39 2007
@@ -36,12 +36,14 @@
     public RequiredDeliveryException(String message, AMQMessage payload)
     {
         super(message);
-        _amqMessage = payload;
+
         // Increment the reference as this message is in the routing phase
         // and so will have the ref decremented as routing fails.
         // we need to keep this message around so we can return it in the
-        // handler. So increment here. 
-        payload.incrementReference();
+        // handler. So increment here.  
+	_amqMessage = payload.takeReference();
+ 
+        //payload.incrementReference();
     }
 
     public AMQMessage getAMQMessage()

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?view=diff&rev=518667&r1=518666&r2=518667
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java Thu Mar 15 08:39:39 2007
@@ -116,7 +116,7 @@
         for (UnacknowledgedMessage msg : _unacked)
         {
             msg.clearTransientMessageData();
-            msg.message.incrementReference();
+            msg.message.takeReference();
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=518667&r1=518666&r2=518667
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Mar 15 08:39:39 2007
@@ -355,15 +355,22 @@
         return _messageId;
     }
 
+    /**
+     * Creates a long-lived reference to this message, and increments the count of such references, as an atomic operation.
+     */
+    public AMQMessage takeReference()
+    {
+        _referenceCount.incrementAndGet();
+	return this;
+    }
+
     /** Threadsafe. Increment the reference count on the message. */
-    public void incrementReference()
+    protected void incrementReference()
     {
         _referenceCount.incrementAndGet();
         if (_log.isDebugEnabled())
         {
-
             _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
-
         }
     }
 
@@ -377,11 +384,13 @@
      */
     public void decrementReference(StoreContext storeContext) throws MessageCleanupException
     {
+        int count = _referenceCount.decrementAndGet();
+
         // note that the operation of decrementing the reference count and then removing the message does not
         // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
         // the message has been passed to all queues. i.e. we are
         // not relying on the all the increments having taken place before the delivery manager decrements.
-        if (_referenceCount.decrementAndGet() == 0)
+        if (count == 0)
         {
             try
             {
@@ -408,13 +417,13 @@
         {
             if (_log.isDebugEnabled())
             {
-                _log.debug("Decremented ref count is now " + _referenceCount + " for message id " + debugIdentity() + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 5));
-                if (_referenceCount.get() < 0)
+                _log.debug("Decremented ref count is now " + count + " for message id " + debugIdentity() + "\n" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 5));
+                if (count < 0)
                 {
                     Thread.dumpStack();
                 }
             }
-            if (_referenceCount.get() < 0)
+            if (count < 0)
             {
                 throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.");
             }

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java?view=diff&rev=518667&r1=518666&r2=518667
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java Thu Mar 15 08:39:39 2007
@@ -117,6 +117,8 @@
                 waitFor(count);
                 check();
                 _logger.info("Completed without failure");
+
+	        Thread.sleep(10);
                 _connection.close();
 
                 _logger.error("End Run Number:" + (run - 1));

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java?view=diff&rev=518667&r1=518666&r2=518667
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java Thu Mar 15 08:39:39 2007
@@ -80,7 +80,8 @@
         AMQMessage message = new AMQMessage(_store.getNewMessageId(), info,
                                             new NonTransactionalContext(_store, _storeContext, null, null, null),
                                             createPersistentContentHeader());
-        message.incrementReference();
+        message = message.takeReference();
+
         // we call routing complete to set up the handle
         message.routingComplete(_store, _storeContext, new MessageHandleFactory());
         assertTrue(_store.getMessageMetaDataMap().size() == 1);
@@ -128,11 +129,12 @@
                                             info,
                                             new NonTransactionalContext(_store, _storeContext, null, null, null),
                                             createPersistentContentHeader());
-        message.incrementReference();
+        
+        message = message.takeReference();
         // we call routing complete to set up the handle
         message.routingComplete(_store, _storeContext, new MessageHandleFactory());
         assertTrue(_store.getMessageMetaDataMap().size() == 1);
-        message.incrementReference();
+        message = message.takeReference();
         message.decrementReference(_storeContext);
         assertTrue(_store.getMessageMetaDataMap().size() == 1);
     }