You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/02/20 15:48:29 UTC

svn commit: r746259 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/ main/java/org/apache/qpid/server/ack/ main/java/org/apache/qpid/server/queue/ main/java/org/apache/qpid/server/store/ main/java/org/apache/qpid/server/subscrip...

Author: ritchiem
Date: Fri Feb 20 14:48:28 2009
New Revision: 746259

URL: http://svn.apache.org/viewvc?rev=746259&view=rev
Log:
QPID-1632 - Initial testing of reference counting and implemntation of TransactionLog based reference counting

Added:
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Feb 20 14:48:28 2009
@@ -251,7 +251,7 @@
         }
         catch (NoRouteException e)
         {
-            //_currentMessage.incrementReference();
+            //_currentMessage.takeReference();
             _returnMessages.add(e);
         }
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java Fri Feb 20 14:48:28 2009
@@ -61,7 +61,8 @@
         // and so will have the ref decremented as routing fails.
         // we need to keep this message around so we can return it in the
         // handler. So increment here.
-        _amqMessage = payload.takeReference();
+        payload.incrementReference(1);
+        _amqMessage = payload;
 
     }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java Fri Feb 20 14:48:28 2009
@@ -130,7 +130,7 @@
         //in memory (persistent changes will be rolled back by store)
         for (QueueEntry msg : _unacked.values())
         {
-            msg.getMessage().takeReference();
+            msg.getMessage().incrementReference(1);
         }
     }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Fri Feb 20 14:48:28 2009
@@ -126,9 +126,5 @@
 
     boolean incrementReference(int queueCount);
 
-    boolean incrementReference();
-
-    AMQMessage takeReference();
-
     boolean isReferenced();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Fri Feb 20 14:48:28 2009
@@ -78,6 +78,10 @@
                            final AMQProtocolSession publisher,
                            TransactionLog messasgeStore)
     {
+        if (publisher == null)
+        {
+            throw new NullPointerException("Message Publisher cannot be null");
+        }
         _messagePublishInfo = info;
         _txnContext = txnContext;
         _publisher = publisher;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageFactory.java Fri Feb 20 14:48:28 2009
@@ -42,19 +42,19 @@
         _messageId = new AtomicLong(0L);
     }
 
-    public void start()
+    public void recoveryComplete()
     {
         _state = State.OPEN;
     }
 
     /**
-     * Only used by test as test suite is run in a single VM we need to beable to re-enable recovery mode.
-     */    
-    protected void enableRecover()
+     * Only to be used by tests as this will cause violate the principal that message IDs should not be reused.
+     */
+    public void reset()
     {
         _state = State.RECOVER;
+        _messageId = new AtomicLong(0L);
     }
-
     
     /**
      * Normal message creation path

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PersistentAMQMessage.java Fri Feb 20 14:48:28 2009
@@ -60,6 +60,7 @@
     @Override
     public void removeMessage(StoreContext storeContext) throws AMQException
     {
+        _log.info("PAMQM : removing message:" + _messageId);
         _transactionLog.removeMessage(storeContext, _messageId);
     }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Feb 20 14:48:28 2009
@@ -301,10 +301,16 @@
 
     public void dispose(final StoreContext storeContext) throws MessageCleanupException
     {
+        _log.info("QEI Disposing of message:" + getMessage().getMessageId() + ": state=" + _state);
         if(delete())
         {
+            _log.info("QEI delete message:" + getMessage().getMessageId());
             getMessage().decrementReference(storeContext);
         }
+        else
+        {
+            _log.info("QEI delete state wrong:" + getMessage().getMessageId());
+        }
     }
 
     public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Fri Feb 20 14:48:28 2009
@@ -918,7 +918,7 @@
                 {
                     if (!entry.isDeleted())
                     {
-                        return entry.getMessage().incrementReference();
+                        return entry.getMessage().incrementReference(1);
                     }
                 }
 
@@ -1418,7 +1418,7 @@
         }
     }
 
-    @Override
+    
     public void checkMessageStatus() throws AMQException
     {
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java Fri Feb 20 14:48:28 2009
@@ -32,19 +32,17 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.store.StoreContext;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Collections;
-import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
-/**
- * A deliverable message.
- */
+/** A deliverable message. */
 public class TransientAMQMessage implements AMQMessage
 {
     /** Used for debugging purposes. */
-    private static final Logger _log = Logger.getLogger(AMQMessage.class);
+    protected static final Logger _log = Logger.getLogger(AMQMessage.class);
 
     private final AtomicInteger _referenceCount = new AtomicInteger(1);
 
@@ -58,8 +56,6 @@
 
     protected final Long _messageId;
 
-
-
     /** Flag to indicate that this message requires 'immediate' delivery. */
 
     private static final byte IMMEDIATE = 0x01;
@@ -143,23 +139,23 @@
     /**
      * Used by SimpleAMQQueueTest, TxAckTest.TestMessage, AbstractHeaderExchangeTestBase.Message
      * These all need refactoring to some sort of MockAMQMessageFactory.
-     */ 
+     */
     @Deprecated
     protected TransientAMQMessage(AMQMessage message) throws AMQException
     {
         _messageId = message.getMessageId();
-        _flags = ((TransientAMQMessage)message)._flags;
+        _flags = ((TransientAMQMessage) message)._flags;
         _contentHeaderBody = message.getContentHeaderBody();
         _messagePublishInfo = message.getMessagePublishInfo();
     }
 
-
     /**
      * Normal message creation via the MessageFactory uses this constructor
      * Package scope limited as MessageFactory should be used
-     * @see MessageFactory
      *
      * @param messageId
+     *
+     * @see MessageFactory
      */
     TransientAMQMessage(Long messageId)
     {
@@ -191,7 +187,6 @@
         return new BodyContentIterator();
     }
 
-
     public ContentHeaderBody getContentHeaderBody()
     {
         return _contentHeaderBody;
@@ -202,32 +197,19 @@
         return _messageId;
     }
 
-    /**
-     * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
-     * operation.
-     */
-    public AMQMessage takeReference()
-    {
-        incrementReference(); // _referenceCount.incrementAndGet();
-
-        return this;
-    }
-
-    public boolean incrementReference()
-    {
-        return incrementReference(1);
-    }
-
     /* Threadsafe. Increment the reference count on the message. */
     public boolean incrementReference(int count)
     {
-        if(_referenceCount.addAndGet(count) <= 1)
+        if (_referenceCount.addAndGet(count) <= 1)
         {
-            _referenceCount.addAndGet(-count);
+            int newcount = _referenceCount.addAndGet(-count);
+            _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") to :" + newcount);
             return false;
         }
         else
         {
+            _log.debug("Message(" + _messageId + ") Incremented Ref count by (" + count + ") but count was <=1("
+                       + _referenceCount.get() + ")");
             return true;
         }
 
@@ -247,6 +229,8 @@
 
         int count = _referenceCount.decrementAndGet();
 
+        _log.debug("Message(" + _messageId + ") Decremented Ref count to :" + count);
+
         // note that the operation of decrementing the reference count and then removing the message does not
         // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
         // the message has been passed to all queues. i.e. we are
@@ -256,10 +240,11 @@
             // set the reference count way below 0 so that we can detect that the message has been deleted
             // this is to guard against the message being spontaneously recreated (from the mgmt console)
             // by copying from other queues at the same time as it is being removed.
-            _referenceCount.set(Integer.MIN_VALUE/2);
+            _referenceCount.set(Integer.MIN_VALUE / 2);
 
             try
             {
+                _log.debug("Reference Count hit 0, removing message");
                 // must check if the handle is null since there may be cases where we decide to throw away a message
                 // and the handle has not yet been constructed
                 // no need to perform persistent check anymore as TransientAMQM.removeMessage() is a no-op
@@ -268,7 +253,7 @@
             catch (AMQException e)
             {
                 // to maintain consistency, we revert the count
-                incrementReference();
+                incrementReference(1);
                 throw new MessageCleanupException(getMessageId(), e);
             }
         }
@@ -282,7 +267,6 @@
         }
     }
 
-
     /**
      * Called selectors to determin if the message has already been sent
      *
@@ -296,10 +280,10 @@
     /**
      * Called to enforce the 'immediate' flag.
      *
-     * @returns  true if the message is marked for immediate delivery but has not been marked as delivered
-     *                              to a consumer
+     * @returns true if the message is marked for immediate delivery but has not been marked as delivered
+     * to a consumer
      */
-    public boolean immediateAndNotDelivered() 
+    public boolean immediateAndNotDelivered()
     {
 
         return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
@@ -335,7 +319,6 @@
         _flags |= DELIVERED_TO_CONSUMER;
     }
 
-
     public long getSize()
     {
         return _contentHeaderBody.bodySize;
@@ -345,7 +328,7 @@
     {
         return _sessionIdentifier.getSessionInstance();
     }
-                                                                                          
+
     public Object getPublisherIdentifier()
     {
         return _sessionIdentifier.getSessionIdentifier();
@@ -356,7 +339,7 @@
         _sessionIdentifier = sessionIdentifier;
     }
 
-    /** From AMQMessageHandle **/
+    /** From AMQMessageHandle * */
 
     public int getBodyCount()
     {
@@ -365,7 +348,7 @@
 
     public ContentChunk getContentChunk(int index)
     {
-        if(_contentBodies == null)
+        if (_contentBodies == null)
         {
             throw new RuntimeException("No ContentBody has been set");
         }
@@ -381,9 +364,9 @@
     public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody)
             throws AMQException
     {
-        if(_contentBodies == null)
+        if (_contentBodies == null)
         {
-            if(isLastContentBody)
+            if (isLastContentBody)
             {
                 _contentBodies = Collections.singletonList(contentChunk);
             }
@@ -411,9 +394,10 @@
 
     /**
      * This is called when all the content has been received.
+     *
      * @param storeContext
-     *@param messagePublishInfo
-     * @param contentHeaderBody @throws AMQException
+     * @param messagePublishInfo
+     * @param contentHeaderBody  @throws AMQException
      */
     public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo,
                                                ContentHeaderBody contentHeaderBody)
@@ -425,7 +409,7 @@
             throw new NullPointerException("HeaderBody cannot be null");
         }
 
-        if( messagePublishInfo == null)
+        if (messagePublishInfo == null)
         {
             throw new NullPointerException("PublishInfo cannot be null");
         }
@@ -433,15 +417,14 @@
         _messagePublishInfo = messagePublishInfo;
         _contentHeaderBody = contentHeaderBody;
 
-
-        if( contentHeaderBody.bodySize == 0)
+        if (contentHeaderBody.bodySize == 0)
         {
             _contentBodies = Collections.EMPTY_LIST;
-        }       
+        }
 
         _arrivalTime = System.currentTimeMillis();
 
-        if(messagePublishInfo.isImmediate())
+        if (messagePublishInfo.isImmediate())
         {
             _flags |= IMMEDIATE;
         }
@@ -457,7 +440,6 @@
         //no-op
     }
 
-
     public String toString()
     {
         // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Fri Feb 20 14:48:28 2009
@@ -1357,7 +1357,7 @@
 
                 if(message != null)
                 {
-                    message.incrementReference();
+                    message.incrementReference(1);
                 }
                 else
                 {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Fri Feb 20 14:48:28 2009
@@ -594,6 +594,7 @@
     protected void sendToClient(final QueueEntry entry, final long deliveryTag)
             throws AMQException
     {
+        _logger.info("Sending Message(" + entry + ") DTag:" + deliveryTag + " to subscription:" + debugIdentity());
         _deliveryMethod.deliverToClient(this,entry,deliveryTag);
     }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Fri Feb 20 14:48:28 2009
@@ -92,7 +92,7 @@
         public void process() throws AMQException
         {
 
-            _message.incrementReference();
+            _message.incrementReference(1);
             try
             {
                 QueueEntry entry = _queue.enqueue(getStoreContext(),_message);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Fri Feb 20 14:48:28 2009
@@ -238,6 +238,13 @@
                                          " does not.");
         }
         _transactionLog = (TransactionLog) o;
+
+        //Assign RoutingTable as old MessageStores converted to TransactionLog may require the _routingTable.
+        if (_transactionLog instanceof RoutingTable)
+        {
+            _routingTable = (RoutingTable)_transactionLog;
+        }
+
         _transactionLog.configure(this, "store", config);
     }
 
@@ -261,9 +268,9 @@
         }
         else
         {
-            if (_transactionLog instanceof RoutingTable)
+            if (_routingTable == null)
             {
-                _routingTable = (RoutingTable)_transactionLog;
+                throw new RuntimeException("No Routing Table configured unable to startup.");
             }
         }
     }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java Fri Feb 20 14:48:28 2009
@@ -242,9 +242,9 @@
         }
 
 
-        public boolean incrementReference()
+        public boolean incrementReference(int count)
         {
-            _count++;
+            _count+=count;
             return true;
         }
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageFactoryRecoveryTest.java Fri Feb 20 14:48:28 2009
@@ -29,16 +29,15 @@
     public void setUp()
     {
         _factory = MessageFactory.getInstance();
-
+        _factory.reset();
     }
 
     public void test()
     {
-        AMQMessage message = _factory.createMessage(null, false);
-
-        _factory.enableRecover();
 
-        Long messasgeID = message.getMessageId();
+        Long messasgeID = 1L;
+        //Create initial message
+        _factory.createMessage(messasgeID, null);
 
         try
         {
@@ -67,7 +66,7 @@
         messasgeID += 100;
         try
         {
-            message = _factory.createMessage(messasgeID, null);
+            AMQMessage message = _factory.createMessage(messasgeID, null);
             assertEquals("Factory assigned incorrect id.", messasgeID, message.getMessageId());
         }
         catch (Exception re)
@@ -76,7 +75,7 @@
         }
 
         // End the reovery process.
-        _factory.start();
+        _factory.recoveryComplete();
 
         //Check we cannot still create by id after ending recovery phase
         try
@@ -96,7 +95,7 @@
 
         try
         {
-            message = _factory.createMessage(null, false);
+            AMQMessage message = _factory.createMessage(null, false);
             assertEquals("Factory assigned incorrect id.", messasgeID, message.getMessageId());
         }
         catch (Exception re)

Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java?rev=746259&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java (added)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MessageReferenceCountingTest.java Fri Feb 20 14:48:28 2009
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+
+public class MessageReferenceCountingTest extends TestCase
+{
+    AMQMessage _message;
+
+    public void setUp()
+    {
+        _message = MessageFactory.getInstance().createMessage(null, false);
+    }
+
+    public void testInitialState()
+    {
+
+        assertTrue("New messages should have a reference", _message.isReferenced());
+    }
+
+    public void testIncrementReference()
+    {
+        assertTrue("Message should maintain Referenced state", _message.isReferenced());
+        assertTrue("Incrementing should be allowed ",_message.incrementReference(1));
+        assertTrue("Message should maintain Referenced state", _message.isReferenced());
+        assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(1));
+        assertTrue("Message should maintain Referenced state", _message.isReferenced());
+        assertTrue("Incrementing should be allowed as much as we need",_message.incrementReference(2));
+        assertTrue("Message should maintain Referenced state", _message.isReferenced());
+    }
+
+    public void testDecrementReference()
+    {
+        assertTrue("Message should maintain Referenced state", _message.isReferenced());
+        try
+        {
+            _message.decrementReference(null);
+        }
+        catch (MessageCleanupException e)
+        {
+            fail("Decrement should be allowed:"+e.getMessage());
+        }
+
+        assertFalse("Message should not be Referenced state", _message.isReferenced());
+
+        try
+        {
+            _message.decrementReference(null);
+            fail("Decrement should not be allowed as we should have a ref count of 0");
+        }
+        catch (MessageCleanupException e)
+        {
+            assertTrue("Incorrect exception thrown.",e.getMessage().contains("has gone below 0"));
+        }
+
+    }
+
+}

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Fri Feb 20 14:48:28 2009
@@ -20,193 +20,22 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.subscription.Subscription;
-
-public class MockQueueEntry implements QueueEntry
+public class MockQueueEntry extends QueueEntryImpl
 {
+    static SimpleQueueEntryList _defaultList = new SimpleQueueEntryList(new MockAMQQueue("MockQueueEntry_DefaultQueue"));
 
-    private AMQMessage _message;
-    private boolean _redelivered;
-
-    public boolean acquire()
-    {
-        return false;
-    }
-
-    public boolean acquire(Subscription sub)
-    {
-        return false;
-    }
-
-    public boolean acquiredBySubscription()
-    {
-        return false;
-    }
-
-    public void addStateChangeListener(StateChangeListener listener)
-    {
-
-    }
-
-    public String debugIdentity()
-    {
-        return null;
-    }
-
-    public boolean delete()
-    {
-        return false;
-    }
-
-    public void dequeue(StoreContext storeContext) throws FailedDequeueException
-    {
-
-    }
-
-    public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
-    {
-
-    }
-
-    public void dispose(StoreContext storeContext) throws MessageCleanupException
-    {
-
-    }
-
-    public boolean expired() throws AMQException
-    {
-        return false;
-    }
-
-    public Subscription getDeliveredSubscription()
-    {
-        return null;
-    }
-
-    public boolean getDeliveredToConsumer()
-    {
-        return false;
-    }
-
-    public AMQMessage getMessage()
-    {
-        return _message;
-    }
-
-    public AMQQueue getQueue()
-    {
-        return null;
-    }
-
-    public long getSize()
-    {
-        return 0;
-    }
-
-    public boolean immediateAndNotDelivered()
-    {
-        return false;
-    }
-
-    public boolean isAcquired()
-    {
-        return false;
-    }
-
-    public boolean isDeleted()
-    {
-        return false;
-    }
-
-    
-    public boolean isQueueDeleted()
-    {
-
-        return false;
-    }
-
-    
-    public boolean isRejectedBy(Subscription subscription)
-    {
-
-        return false;
-    }
-
-    
-    public void reject()
-    {
-
-
-    }
-
-    
-    public void reject(Subscription subscription)
-    {
-
-
-    }
-
-    
-    public void release()
-    {
-
-
-    }
-
-    
-    public boolean removeStateChangeListener(StateChangeListener listener)
-    {
-
-        return false;
-    }
-
-    
-    public void requeue(StoreContext storeContext) throws AMQException
-    {
-
-
-    }
-
-    
-    public void setDeliveredToSubscription()
-    {
-
-
-    }
-
-    
-    public void setRedelivered(boolean redelivered)
-    {
-         _redelivered = redelivered;
-    }
-
-    
-    public int compareTo(QueueEntry o)
-    {
-
-        return 0;
-    }
-
-    public void setMessage(AMQMessage msg)
-    {
-        _message = msg;
-    }
-
-    public ContentHeaderBody getContentHeaderBody() throws AMQException
+    public MockQueueEntry()
     {
-        return _message.getContentHeaderBody();
+        super(_defaultList);
     }
 
-    public boolean isPersistent() throws AMQException
+    public MockQueueEntry(SimpleQueueEntryList queueEntryList, AMQMessage message)
     {
-        return _message.isPersistent();
+        super(queueEntryList, message);
     }
 
-    public boolean isRedelivered()
+    public MockQueueEntry(AMQMessage message)
     {
-        return _redelivered;
+        super(_defaultList, message);
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Fri Feb 20 14:48:28 2009
@@ -296,7 +296,7 @@
     public void testGetLastFiveMessageIds() throws Exception
     {
         AMQMessage message = createMessage();
-        Long messageIdOffset = message.getMessageId() -1 ;
+        Long messageIdOffset = message.getMessageId() - 1;
         for (int i = 0; i < 10; i++)
         {
             // Put message on queue
@@ -335,7 +335,6 @@
         msg.enqueue(qs);
         msg.routingComplete(_store);
 
-
         _store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
 
         // Check that it is enqueued
@@ -343,14 +342,13 @@
         assertNotNull(data);
 
         // Dequeue message
-        MockQueueEntry entry = new MockQueueEntry();
 
         ContentHeaderBody header = new ContentHeaderBody();
         header.bodySize = MESSAGE_SIZE;
         AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store);
         message.setPublishAndContentHeaderBody(new StoreContext(), info, header);
 
-        entry.setMessage(message);
+        MockQueueEntry entry = new MockQueueEntry(message);
         _queue.dequeue(null, entry);
 
         // Check that it is dequeued
@@ -408,9 +406,9 @@
             _tag = getMessageId();
         }
 
-        public boolean incrementReference()
+        public boolean incrementReference(int count)
         {
-            _count++;
+            _count+=count;
             return true;
         }
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=746259&r1=746258&r2=746259&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java Fri Feb 20 14:48:28 2009
@@ -59,7 +59,7 @@
         AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
         message.setPublishAndContentHeaderBody(_storeContext, info, chb);
 
-        message = message.takeReference();
+        message.incrementReference(1);
 
         // we call routing complete to set up the handle
  //       message.routingComplete(_store, _storeContext, new MessageHandleFactory());
@@ -89,10 +89,10 @@
         AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
         message.setPublishAndContentHeaderBody(_storeContext, info, chb);
         
-        message = message.takeReference();
+        message.incrementReference(1);
 
         assertEquals(1, _store.getMessageMetaDataMap().size());
-        message = message.takeReference();
+        message.incrementReference(1);
         message.decrementReference(_storeContext);
         assertEquals(1, _store.getMessageMetaDataMap().size());
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org