You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/08/08 15:15:36 UTC

svn commit: r1616742 - in /qpid/trunk/qpid/java: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/main/java/org/...

Author: rgodfrey
Date: Fri Aug  8 13:15:35 2014
New Revision: 1616742

URL: http://svn.apache.org/r1616742
Log:
QPID-3978 : [Java Broker] Allow for acquired messages to be removed from a queue due to TTL or management actions

Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
    qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Fri Aug  8 13:15:35 2014
@@ -29,6 +29,8 @@ public interface ConsumerTarget
 {
 
 
+    void acquisitionRemoved(MessageInstance node);
+
     enum State
     {
         ACTIVE, SUSPENDED, CLOSED

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Fri Aug  8 13:15:35 2014
@@ -51,6 +51,8 @@ public interface MessageInstance
 
     boolean isAcquiredBy(ConsumerImpl consumer);
 
+    boolean removeAcquisitionFromConsumer(ConsumerImpl consumer);
+
     void setRedelivered();
 
     boolean isRedelivered();
@@ -67,6 +69,10 @@ public interface MessageInstance
 
     boolean acquire(ConsumerImpl sub);
 
+    boolean lockAcquisition();
+
+    boolean unlockAcquisition();
+
     int getMaximumDeliveryCount();
 
     int routeToAlternate(Action<? super MessageInstance> action, ServerTransaction txn);
@@ -99,6 +105,7 @@ public interface MessageInstance
             State currentState = getState();
             return currentState == State.DEQUEUED || currentState == State.DELETED;
         }
+
     }
 
 
@@ -162,10 +169,12 @@ public interface MessageInstance
     public final class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState
     {
         private final C _consumer;
+        private final LockedAcquiredState<C> _lockedState;
 
         public ConsumerAcquiredState(C consumer)
         {
             _consumer = consumer;
+            _lockedState = new LockedAcquiredState<>(this);
         }
 
 
@@ -183,6 +192,43 @@ public interface MessageInstance
         {
             return "{" + getState().name() + " : " + _consumer +"}";
         }
+
+        public LockedAcquiredState<C> getLockedState()
+        {
+            return _lockedState;
+        }
+
+    }
+
+    public final class LockedAcquiredState<C extends ConsumerImpl> extends EntryState
+    {
+        private final ConsumerAcquiredState<C> _acquiredState;
+
+        public LockedAcquiredState(final ConsumerAcquiredState<C> acquiredState)
+        {
+            _acquiredState = acquiredState;
+        }
+
+        @Override
+        public State getState()
+        {
+            return State.ACQUIRED;
+        }
+
+        public C getConsumer()
+        {
+            return _acquiredState.getConsumer();
+        }
+
+        public String toString()
+        {
+            return "{" + getState().name() + " : " + _acquiredState.getConsumer() +"}";
+        }
+
+        public ConsumerAcquiredState<C> getUnlockedState()
+        {
+            return _acquiredState;
+        }
     }
 
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Fri Aug  8 13:15:35 2014
@@ -1164,6 +1164,10 @@ public abstract class AbstractQueue<X ex
                     else
                     {
                         deliverMessage(sub, entry, false);
+                        if(sub.acquires())
+                        {
+                            entry.unlockAcquisition();
+                        }
                     }
                 }
             }
@@ -2001,6 +2005,10 @@ public abstract class AbstractQueue<X ex
                         else
                         {
                             deliverMessage(sub, node, batch);
+                            if(sub.acquires())
+                            {
+                                node.unlockAcquisition();
+                            }
                         }
 
                     }
@@ -2253,14 +2261,28 @@ public abstract class AbstractQueue<X ex
             if (!node.isDeleted())
             {
                 // If the node has expired then acquire it
-                if (node.expired() && node.acquire())
+                if (node.expired())
                 {
-                    if (_logger.isDebugEnabled())
+                    boolean acquiredForDequeueing = node.acquire();
+                    if(!acquiredForDequeueing && node.getDeliveredToConsumer())
+                    {
+                        QueueConsumer consumer = (QueueConsumer) node.getDeliveredConsumer();
+                        acquiredForDequeueing = node.removeAcquisitionFromConsumer(consumer);
+                        if(acquiredForDequeueing)
+                        {
+                            consumer.acquisitionRemoved(node);
+                        }
+                    }
+
+                    if(acquiredForDequeueing)
                     {
-                        _logger.debug("Dequeuing expired node " + node);
+                        if (_logger.isDebugEnabled())
+                        {
+                            _logger.debug("Dequeuing expired node " + node);
+                        }
+                        // Then dequeue it.
+                        dequeueEntry(node);
                     }
-                    // Then dequeue it.
-                    dequeueEntry(node);
                 }
                 else
                 {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Fri Aug  8 13:15:35 2014
@@ -39,6 +39,8 @@ public interface QueueConsumer<X extends
 
     void send(QueueEntry entry, boolean batch);
 
+    void acquisitionRemoved(QueueEntry node);
+
     void queueDeleted();
 
     SubFlushRunner getRunner();

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Fri Aug  8 13:15:35 2014
@@ -477,6 +477,13 @@ class QueueConsumerImpl
     }
 
     @Override
+    public void acquisitionRemoved(final QueueEntry node)
+    {
+        _target.acquisitionRemoved(node);
+        _queue.decrementUnackedMsgCount(node);
+    }
+
+    @Override
     public String getDistributionMode()
     {
         return _distributionMode;

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Aug  8 13:15:35 2014
@@ -210,7 +210,7 @@ public abstract class QueueEntryImpl imp
 
     public boolean acquire(ConsumerImpl sub)
     {
-        final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState());
+        final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState().getLockedState());
         if(acquired)
         {
             _deliveryCountUpdater.compareAndSet(this,-1,0);
@@ -218,17 +218,57 @@ public abstract class QueueEntryImpl imp
         return acquired;
     }
 
+    @Override
+    public boolean lockAcquisition()
+    {
+        EntryState state = _state;
+        if(state instanceof ConsumerAcquiredState)
+        {
+            return _stateUpdater.compareAndSet(this, state, ((ConsumerAcquiredState)state).getLockedState());
+        }
+        return state instanceof LockedAcquiredState;
+    }
+
+    @Override
+    public boolean unlockAcquisition()
+    {
+        EntryState state = _state;
+        if(state instanceof LockedAcquiredState)
+        {
+            return _stateUpdater.compareAndSet(this, state, ((LockedAcquiredState)state).getUnlockedState());
+        }
+        return false;
+    }
+
     public boolean acquiredByConsumer()
     {
 
-        return (_state instanceof ConsumerAcquiredState);
+        return (_state instanceof ConsumerAcquiredState) || (_state instanceof LockedAcquiredState);
     }
 
+    @Override
     public boolean isAcquiredBy(ConsumerImpl consumer)
     {
         EntryState state = _state;
-        return state instanceof ConsumerAcquiredState
-               && ((ConsumerAcquiredState)state).getConsumer() == consumer;
+        return (state instanceof ConsumerAcquiredState
+               && ((ConsumerAcquiredState)state).getConsumer() == consumer)
+                || (state instanceof LockedAcquiredState
+                    && ((LockedAcquiredState)state).getConsumer() == consumer);
+    }
+
+    @Override
+    public boolean removeAcquisitionFromConsumer(ConsumerImpl consumer)
+    {
+        EntryState state = _state;
+        if(state instanceof ConsumerAcquiredState
+               && ((ConsumerAcquiredState)state).getConsumer() == consumer)
+        {
+            return _stateUpdater.compareAndSet(this,state,NON_CONSUMER_ACQUIRED_STATE);
+        }
+        else
+        {
+            return false;
+        }
     }
 
     public void release()
@@ -238,7 +278,7 @@ public abstract class QueueEntryImpl imp
         if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
         {
 
-            if(state instanceof ConsumerAcquiredState)
+            if(state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
             {
                 getQueue().decrementUnackedMsgCount(this);
             }
@@ -268,6 +308,10 @@ public abstract class QueueEntryImpl imp
         {
             return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer();
         }
+        else if (state instanceof LockedAcquiredState)
+        {
+            return (QueueConsumer) ((LockedAcquiredState) state).getConsumer();
+        }
         else
         {
             return null;
@@ -312,7 +356,7 @@ public abstract class QueueEntryImpl imp
 
         if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
         {
-            if (state instanceof ConsumerAcquiredState)
+            if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
             {
                 getQueue().decrementUnackedMsgCount(this);
             }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Fri Aug  8 13:15:35 2014
@@ -68,6 +68,8 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueConsumer;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.access.Operation;
 import org.apache.qpid.server.stats.StatisticsCounter;
@@ -953,15 +955,26 @@ public abstract class AbstractVirtualHos
 
         op.withinTransaction(new Transaction()
         {
-            public void dequeue(final MessageInstance entry)
+            public void dequeue(final MessageInstance messageInstance)
             {
-                if(entry.acquire())
+                boolean acquired = messageInstance.acquire();
+                if(!acquired && messageInstance instanceof QueueEntry)
+                {
+                    QueueEntry entry = (QueueEntry) messageInstance;
+                    QueueConsumer consumer = (QueueConsumer) entry.getDeliveredConsumer();
+                    acquired = messageInstance.removeAcquisitionFromConsumer(consumer);
+                    if(acquired)
+                    {
+                        consumer.acquisitionRemoved((QueueEntry)messageInstance);
+                    }
+                }
+                if(acquired)
                 {
-                    txn.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action()
+                    txn.dequeue(messageInstance.getOwningResource(), messageInstance.getMessage(), new ServerTransaction.Action()
                     {
                         public void postCommit()
                         {
-                            entry.delete();
+                            messageInstance.delete();
                         }
 
                         public void onRollback()

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Fri Aug  8 13:15:35 2014
@@ -181,6 +181,12 @@ public class MockConsumer implements Con
 
     }
 
+    @Override
+    public void acquisitionRemoved(final MessageInstance node)
+    {
+
+    }
+
     public State getState()
     {
         return _state;

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Fri Aug  8 13:15:35 2014
@@ -65,6 +65,12 @@ public class MockMessageInstance impleme
         return false;
     }
 
+    @Override
+    public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+    {
+        return false;
+    }
+
     public void delete()
     {
 
@@ -81,6 +87,18 @@ public class MockMessageInstance impleme
         return false;
     }
 
+    @Override
+    public boolean lockAcquisition()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean unlockAcquisition()
+    {
+        return false;
+    }
+
     public boolean isAvailable()
     {
         return false;

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Fri Aug  8 13:15:35 2014
@@ -137,6 +137,40 @@ public abstract class QueueEntryImplTest
         return consumer;
     }
 
+
+    public void testLocking()
+    {
+        QueueConsumer consumer = newConsumer();
+        QueueConsumer consumer2 = newConsumer();
+
+        _queueEntry.acquire(consumer);
+        assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
+                   _queueEntry.isAcquired());
+
+        assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+        assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition());
+        assertFalse("Acquisition should not be able to be removed from the wrong consumer",_queueEntry.removeAcquisitionFromConsumer(consumer2));
+        assertTrue("Acquisition should be able to be removed once unlocked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+        assertTrue("Queue Entry should still be acquired", _queueEntry.isAcquired());
+        assertFalse("Queue Entry should not be marked as acquired by a consumer", _queueEntry.acquiredByConsumer());
+
+        _queueEntry.release();
+
+        assertFalse("Hijacked queue entry should be able to be released", _queueEntry.isAcquired());
+
+        _queueEntry.acquire(consumer);
+        assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
+                   _queueEntry.isAcquired());
+
+        assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+        assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition());
+        assertTrue("Should be able to unlock locked queue entry",_queueEntry.lockAcquisition());
+        assertFalse("Acquisition should not be able to be hijacked when locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+
+        _queueEntry.delete();
+        assertTrue("Locked queue entry should be able to be deleted", _queueEntry.isDeleted());
+    }
+
     /**
      * A helper method to get entry state
      *

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Fri Aug  8 13:15:35 2014
@@ -342,5 +342,17 @@ public class StandardQueueTest extends A
                 return super.acquire(sub);
             }
         }
+
+        @Override
+        public boolean lockAcquisition()
+        {
+            return true;
+        }
+
+        @Override
+        public boolean unlockAcquisition()
+        {
+            return true;
+        }
     }
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Fri Aug  8 13:15:35 2014
@@ -534,15 +534,25 @@ public class ConsumerTarget_0_10 extends
         return _stopped.get();
     }
 
-    public void acknowledge(MessageInstance entry)
+    public boolean deleteAcquired(MessageInstance entry)
     {
-        // TODO Fix Store Context / cleanup
         if(entry.isAcquiredBy(getConsumer()))
         {
-            _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
-            _unacknowledgedCount.decrementAndGet();
+            acquisitionRemoved(entry);
             entry.delete();
+            return true;
         }
+        else
+        {
+            return false;
+        }
+    }
+
+    @Override
+    public void acquisitionRemoved(final MessageInstance entry)
+    {
+        _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
+        _unacknowledgedCount.decrementAndGet();
     }
 
     public void flush()

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Fri Aug  8 13:15:35 2014
@@ -41,13 +41,13 @@ class ExplicitAcceptDispositionChangeLis
 
     public void onAccept()
     {
-        if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
+        if(_target != null && _entry.isAcquiredBy(_target.getConsumer()) && _entry.lockAcquisition())
         {
             _target.getSessionModel().acknowledge(_target, _entry);
         }
         else
         {
-            _logger.warn("MessageAccept received for message which has not been acquired (likely client error)");
+            _logger.info("MessageAccept received for message which is not been acquired - message may have expired or been removed");
         }
 
     }
@@ -60,7 +60,7 @@ class ExplicitAcceptDispositionChangeLis
         }
         else
         {
-            _logger.warn("MessageRelease received for message which has not been acquired (likely client error)");
+            _logger.warn("MessageRelease received for message which has not been acquired - message may have expired or been removed");
         }
     }
 
@@ -72,7 +72,7 @@ class ExplicitAcceptDispositionChangeLis
         }
         else
         {
-            _logger.warn("MessageReject received for message which has not been acquired (likely client error)");
+            _logger.warn("MessageReject received for message which has not been acquired - message may have expired or been removed");
         }
 
     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Fri Aug  8 13:15:35 2014
@@ -29,6 +29,7 @@ public class MessageAcceptCompletionList
     private final ConsumerTarget_0_10 _sub;
     private final MessageInstance _entry;
     private final ServerSession _session;
+    private long _messageSize;
     private boolean _restoreCredit;
 
     public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
@@ -38,15 +39,19 @@ public class MessageAcceptCompletionList
         _entry = entry;
         _session = session;
         _restoreCredit = restoreCredit;
+        if(restoreCredit)
+        {
+            _messageSize = entry.getMessage().getSize();
+        }
     }
 
     public void onComplete(Method method)
     {
         if(_restoreCredit)
         {
-            _sub.restoreCredit(_entry.getMessage());
+            _sub.getCreditManager().restoreCredit(1l, _messageSize);
         }
-        if(_entry.isAcquiredBy(_sub.getConsumer()))
+        if(_entry.isAcquiredBy(_sub.getConsumer()) && _entry.lockAcquisition())
         {
             _session.acknowledge(_sub, _entry);
         }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Fri Aug  8 13:15:35 2014
@@ -460,7 +460,7 @@ public class ServerSession extends Sessi
 
                                  public void postCommit()
                                  {
-                                     sub.acknowledge(entry);
+                                     sub.deleteAcquired(entry);
                                  }
 
                                  public void onRollback()

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Aug  8 13:15:35 2014
@@ -1413,7 +1413,11 @@ public class AMQChannel<T extends AMQPro
             // explicit rollbacks resend the message after the rollback-ok is sent
             if(_rollingBack)
             {
-                 _resendList.addAll(_ackedMessages);
+                for(MessageInstance entry : _ackedMessages)
+                {
+                    entry.unlockAcquisition();
+                }
+                _resendList.addAll(_ackedMessages);
             }
             else
             {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Fri Aug  8 13:15:35 2014
@@ -20,11 +20,16 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.log4j.Logger;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.message.InstanceProperties;
@@ -34,14 +39,10 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.consumer.AbstractConsumerTarget;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.StateChangeListener;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * Encapsulation of a subscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
  * that was given out by the broker and the channel id. <p/>
@@ -57,7 +58,7 @@ public abstract class ConsumerTarget_0_8
                                          final MessageInstance.State oldSate,
                                          final MessageInstance.State newState)
                 {
-                    if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED))
+                    if (oldSate == QueueEntry.State.ACQUIRED && newState != QueueEntry.State.ACQUIRED)
                     {
                         restoreCredit(entry.getMessage());
                     }
@@ -74,8 +75,8 @@ public abstract class ConsumerTarget_0_8
 
 
     public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
-                                                             AMQShortString consumerTag, FieldTable filters,
-                                                             FlowCreditManager creditManager) throws AMQException
+                                                         AMQShortString consumerTag, FieldTable filters,
+                                                         FlowCreditManager creditManager) throws AMQException
     {
         return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
     }
@@ -557,6 +558,11 @@ public abstract class ConsumerTarget_0_8
         });
     }
 
+    @Override
+    public void acquisitionRemoved(final MessageInstance node)
+    {
+    }
+
     public long getUnacknowledgedBytes()
     {
         return _unacknowledgedBytes.longValue();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Fri Aug  8 13:15:35 2014
@@ -20,31 +20,28 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.queue.QueueEntry;
-
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
+
 public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
 {
     private final Object _lock = new Object();
 
-    private long _unackedSize;
-
     private Map<Long, MessageInstance> _map;
 
-    private long _lastDeliveryTag;
-
     private final int _prefetchLimit;
 
     public UnacknowledgedMessageMapImpl(int prefetchLimit)
     {
         _prefetchLimit = prefetchLimit;
-        _map = new LinkedHashMap<Long, MessageInstance>(prefetchLimit);
+        _map = new LinkedHashMap<>(prefetchLimit);
     }
 
     public void collect(long deliveryTag, boolean multiple, Map<Long, MessageInstance> msgs)
@@ -81,12 +78,6 @@ public class UnacknowledgedMessageMapImp
         {
 
             MessageInstance message = _map.remove(deliveryTag);
-            if(message != null)
-            {
-                _unackedSize -= message.getMessage().getSize();
-
-            }
-
             return message;
         }
     }
@@ -109,8 +100,6 @@ public class UnacknowledgedMessageMapImp
         synchronized (_lock)
         {
             _map.put(deliveryTag, message);
-            _unackedSize += message.getMessage().getSize();
-            _lastDeliveryTag = deliveryTag;
         }
     }
 
@@ -119,8 +108,7 @@ public class UnacknowledgedMessageMapImp
         synchronized (_lock)
         {
             Collection<MessageInstance> currentEntries = _map.values();
-            _map = new LinkedHashMap<Long, MessageInstance>(_prefetchLimit);
-            _unackedSize = 0l;
+            _map = new LinkedHashMap<>(_prefetchLimit);
             return currentEntries;
         }
     }
@@ -138,7 +126,6 @@ public class UnacknowledgedMessageMapImp
         synchronized (_lock)
         {
             _map.clear();
-            _unackedSize = 0l;
         }
     }
 
@@ -163,6 +150,14 @@ public class UnacknowledgedMessageMapImp
         Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<Long,MessageInstance>();
         collect(deliveryTag, multiple, ackedMessageMap);
         remove(ackedMessageMap);
+        List<MessageInstance> acknowledged = new ArrayList<>();
+        for(MessageInstance instance : ackedMessageMap.values())
+        {
+            if(instance.lockAcquisition())
+            {
+                acknowledged.add(instance);
+            }
+        }
         return ackedMessageMap.values();
     }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Fri Aug  8 13:15:35 2014
@@ -20,6 +20,9 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.nio.ByteBuffer;
+import java.util.List;
+
 import org.apache.qpid.amqp_1_0.codec.ValueHandler;
 import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
 import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
@@ -37,19 +40,16 @@ import org.apache.qpid.amqp_1_0.type.mes
 import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
 import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.consumer.AbstractConsumerTarget;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
-import java.nio.ByteBuffer;
-import java.util.List;
-
 class ConsumerTarget_1_0 extends AbstractConsumerTarget
 {
     private final boolean _acquires;
@@ -378,6 +378,7 @@ class ConsumerTarget_1_0 extends Abstrac
 
             if(outcome instanceof Accepted)
             {
+                _queueEntry.lockAcquisition();
                 txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(),
                         new ServerTransaction.Action()
                         {
@@ -412,6 +413,7 @@ class ConsumerTarget_1_0 extends Abstrac
                                 modified.setDeliveryFailed(true);
                                 _link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
                                 _link.getEndpoint().sendFlowConditional();
+                                _queueEntry.unlockAcquisition();
                             }
                         }
                     });
@@ -498,6 +500,11 @@ class ConsumerTarget_1_0 extends Abstrac
     }
 
     @Override
+    public void acquisitionRemoved(final MessageInstance node)
+    {
+    }
+
+    @Override
     public void consumerAdded(final ConsumerImpl sub)
     {
         _consumer = sub;

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Fri Aug  8 13:15:35 2014
@@ -636,19 +636,21 @@ public class SendingLink_1_0 implements 
                     AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
                     if(_consumer.acquires())
                     {
-                        txn.dequeue(Collections.singleton(queueEntry),
-                                new ServerTransaction.Action()
-                                {
-                                    public void postCommit()
-                                    {
-                                        queueEntry.delete();
-                                    }
-
-                                    public void onRollback()
-                                    {
-                                        //To change body of implemented methods use File | Settings | File Templates.
-                                    }
-                                });
+                        if(queueEntry.acquire() || queueEntry.isAcquired())
+                        {
+                            txn.dequeue(Collections.singleton(queueEntry),
+                                        new ServerTransaction.Action()
+                                        {
+                                            public void postCommit()
+                                            {
+                                                queueEntry.delete();
+                                            }
+
+                                            public void onRollback()
+                                            {
+                                            }
+                                        });
+                        }
                     }
                 }
                 else if(outcome instanceof Released)

Modified: qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Fri Aug  8 13:15:35 2014
@@ -1071,6 +1071,12 @@ class ManagementNode implements MessageS
         }
 
         @Override
+        public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+        {
+            return false;
+        }
+
+        @Override
         public void setRedelivered()
         {
 
@@ -1119,6 +1125,18 @@ class ManagementNode implements MessageS
         }
 
         @Override
+        public boolean lockAcquisition()
+        {
+            return false;
+        }
+
+        @Override
+        public boolean unlockAcquisition()
+        {
+            return false;
+        }
+
+        @Override
         public int getMaximumDeliveryCount()
         {
             return 0;

Modified: qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java Fri Aug  8 13:15:35 2014
@@ -90,6 +90,12 @@ class ManagementResponse implements Mess
     }
 
     @Override
+    public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+    {
+        return consumer == _consumer;
+    }
+
+    @Override
     public void setRedelivered()
     {
         _isRedelivered = true;
@@ -138,6 +144,18 @@ class ManagementResponse implements Mess
     }
 
     @Override
+    public boolean lockAcquisition()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean unlockAcquisition()
+    {
+        return false;
+    }
+
+    @Override
     public int getMaximumDeliveryCount()
     {
         return 0;
@@ -190,7 +208,7 @@ class ManagementResponse implements Mess
     @Override
     public void delete()
     {
-        // TODO
+        _isDeleted = true;
     }
 
     @Override

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java?rev=1616742&r1=1616741&r2=1616742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java Fri Aug  8 13:15:35 2014
@@ -21,15 +21,8 @@
 
 package org.apache.qpid.server.queue;
 
-import org.junit.Assert;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -39,8 +32,17 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TopicSubscriber;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import javax.naming.NamingException;
+
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 public class TimeToLiveTest extends QpidBrokerTestCase
 {
@@ -53,18 +55,29 @@ public class TimeToLiveTest extends Qpid
     private static final int MSG_COUNT = 50;
     private static final long SERVER_TTL_TIMEOUT = 60000L;
 
+    public void testPassiveTTLWithPrefetch() throws Exception
+    {
+        doTestPassiveTTL(true);
+    }
+
     public void testPassiveTTL() throws Exception
     {
+        doTestPassiveTTL(false);
+
+    }
+
+    private void doTestPassiveTTL(boolean prefetchMessages) throws JMSException, NamingException
+    {
         //Create Client 1
         Connection clientConnection = getConnection();
-        
+
         Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = clientSession.createQueue(QUEUE); 
-        
+        Queue queue = clientSession.createQueue(QUEUE);
+
         // Create then close the consumer so the queue is actually created
         // Closing it then reopening it ensures that the consumer shouldn't get messages
         // which should have expired and allows a shorter sleep period. See QPID-1418
-        
+
         MessageConsumer consumer = clientSession.createConsumer(queue);
         consumer.close();
 
@@ -79,6 +92,12 @@ public class TimeToLiveTest extends Qpid
 
         MessageProducer producer = producerSession.createProducer(queue);
 
+        consumer = clientSession.createConsumer(queue);
+        if(prefetchMessages)
+        {
+            clientConnection.start();
+        }
+
         //Set TTL
         int msg = 0;
         producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer));
@@ -96,7 +115,6 @@ public class TimeToLiveTest extends Qpid
 
         producerSession.commit();
 
-        consumer = clientSession.createConsumer(queue);
 
         // Ensure we sleep the required amount of time.
         ReentrantLock waitLock = new ReentrantLock();
@@ -124,6 +142,16 @@ public class TimeToLiveTest extends Qpid
 
         }
 
+        if(prefetchMessages)
+        {
+            clientConnection.close();
+            clientConnection = getConnection();
+
+            clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            queue = clientSession.createQueue(QUEUE);
+            consumer = clientSession.createConsumer(queue);
+        }
+
         clientConnection.start();
 
         //Receive Message 0
@@ -131,14 +159,14 @@ public class TimeToLiveTest extends Qpid
         Message receivedFirst = consumer.receive(5000);
         Message receivedSecond = consumer.receive(5000);
         Message receivedThird = consumer.receive(1000);
-        
+
         // Log the messages to help diagnosis incase of failure
         _logger.info("First:"+receivedFirst);
         _logger.info("Second:"+receivedSecond);
         _logger.info("Third:"+receivedThird);
 
         // Only first and last messages sent should survive expiry
-        Assert.assertNull("More messages received", receivedThird); 
+        Assert.assertNull("More messages received", receivedThird);
 
         Assert.assertNotNull("First message not received", receivedFirst);
         Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first"));



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org