You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/04/11 14:27:07 UTC

svn commit: r1738576 - in /qpid/java/branches/6.0.x: ./ broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/main/ja...

Author: kwall
Date: Mon Apr 11 12:27:07 2016
New Revision: 1738576

URL: http://svn.apache.org/viewvc?rev=1738576&view=rev
Log:
QPID-7154: [Java Broker] Ensure that dead letter paths always locks the queue entry acquisition

Merged with command:

svn merge -c 1737804,1737984,1738119 ^/qpid/java/trunk

Added:
    qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java
      - copied, changed from r1737804, qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java
Modified:
    qpid/java/branches/6.0.x/   (props changed)
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
    qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
    qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
    qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
    qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
    qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
    qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
    qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
    qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
    qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
    qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
    qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
    qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
    qpid/java/branches/6.0.x/test-profiles/CPPExcludes   (contents, props changed)

Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Apr 11 12:27:07 2016
@@ -9,5 +9,5 @@
 /qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
 657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732525,1732812,1734452,1736478,1736751,1736838,1737835,1737992,1738231
+/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
 657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732525,1732812,1734452,1736478,1736751,1736838,1737804,1737835,1737984,1737992,1738119,1738231
 /qpid/trunk/qpid:796646-796653

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Mon Apr 11 12:27:07 2016
@@ -251,10 +251,7 @@ public abstract class AbstractConsumerTa
             while((instance = _queue.poll()) != null)
             {
                 MessageInstance entry = instance.getEntry();
-                if(entry.isAcquiredBy(instance.getConsumer()))
-                {
-                    entry.release();
-                }
+                entry.release(instance.getConsumer());
                 instance.release();
             }
             doCloseInternal();

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Mon Apr 11 12:27:07 2016
@@ -70,7 +70,7 @@ public interface MessageInstance
 
     boolean acquire(ConsumerImpl sub);
 
-    boolean lockAcquisition();
+    boolean lockAcquisition(final ConsumerImpl consumer);
 
     boolean unlockAcquisition();
 
@@ -250,6 +250,8 @@ public interface MessageInstance
 
     void release();
 
+    void release(ConsumerImpl release);
+
     boolean resend();
 
     void delete();

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java Mon Apr 11 12:27:07 2016
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.Atomi
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -217,6 +218,14 @@ public class LastValueQueueList extends
 
             discardIfReleasedEntryIsNoLongerLatest();
         }
+
+        @Override
+        public void release(ConsumerImpl consumer)
+        {
+            super.release(consumer);
+
+            discardIfReleasedEntryIsNoLongerLatest();
+        }
 
         @Override
         protected void onDelete()

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Apr 11 12:27:07 2016
@@ -208,7 +208,6 @@ public abstract class QueueEntryImpl imp
     {
         return acquire(NON_CONSUMER_ACQUIRED_STATE);
     }
-
     private class DelayedAcquisitionStateListener implements StateChangeListener<MessageInstance, State>
     {
         private final Runnable _task;
@@ -291,10 +290,10 @@ public abstract class QueueEntryImpl imp
     }
 
     @Override
-    public boolean lockAcquisition()
+    public boolean lockAcquisition(final ConsumerImpl consumer)
     {
         EntryState state = _state;
-        if(state instanceof ConsumerAcquiredState)
+        if(state instanceof ConsumerAcquiredState && ((ConsumerAcquiredState) state).getConsumer() == consumer)
         {
             LockedAcquiredState lockedState = ((ConsumerAcquiredState) state).getLockedState();
             boolean updated = _stateUpdater.compareAndSet(this, state, lockedState);
@@ -304,7 +303,7 @@ public abstract class QueueEntryImpl imp
             }
             return updated;
         }
-        return state instanceof LockedAcquiredState;
+        return state instanceof LockedAcquiredState && ((LockedAcquiredState) state).getConsumer() == consumer;
     }
 
     @Override
@@ -375,36 +374,49 @@ public abstract class QueueEntryImpl imp
         }
     }
 
+    @Override
     public void release()
     {
         EntryState state = _state;
 
         if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
         {
+            postRelease(state);
+        }
+    }
 
-            if(state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
-            {
-                getQueue().decrementUnackedMsgCount(this);
-            }
+    @Override
+    public void release(ConsumerImpl consumer)
+    {
+        EntryState state = _state;
+        if(isAcquiredBy(consumer) && _stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
+        {
+            postRelease(state);
+        }
+    }
 
-            if(!getQueue().isDeleted())
-            {
-                getQueue().requeue(this);
-                if(_stateChangeListeners != null)
-                {
-                    notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
-                }
+    private void postRelease(final EntryState previousState)
+    {
+        if(previousState instanceof ConsumerAcquiredState || previousState instanceof LockedAcquiredState)
+        {
+            getQueue().decrementUnackedMsgCount(this);
+        }
 
-            }
-            else if(acquire())
+        if(!getQueue().isDeleted())
+        {
+            getQueue().requeue(this);
+            if(_stateChangeListeners != null)
             {
-                routeToAlternate(null, null);
+                notifyStateChange(State.ACQUIRED, State.AVAILABLE);
             }
-        }
 
+        }
+        else if(acquire())
+        {
+            routeToAlternate(null, null);
+        }
     }
 
-
     @Override
     public QueueConsumer getDeliveredConsumer()
     {
@@ -527,6 +539,10 @@ public abstract class QueueEntryImpl imp
 
     public int routeToAlternate(final Action<? super MessageInstance> action, ServerTransaction txn)
     {
+        if (!isAcquired())
+        {
+            throw new IllegalStateException("Illegal queue entry state. " + this + " is not acquired.");
+        }
         final AMQQueue currentQueue = getQueue();
         Exchange<?> alternateExchange = currentQueue.getAlternateExchange();
         boolean autocommit =  txn == null;

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Mon Apr 11 12:27:07 2016
@@ -448,7 +448,7 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public boolean lockAcquisition()
+        public boolean lockAcquisition(final ConsumerImpl consumer)
         {
             return false;
         }
@@ -504,6 +504,12 @@ public abstract class AbstractSystemMess
         }
 
         @Override
+        public void release(ConsumerImpl consumer)
+        {
+            release();
+        }
+
+        @Override
         public boolean resend()
         {
             return false;

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java Mon Apr 11 12:27:07 2016
@@ -58,6 +58,7 @@ import org.apache.qpid.server.txn.DtxBra
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.transport.Xid;
 import org.apache.qpid.transport.util.Functions;
 
@@ -356,23 +357,33 @@ public class AsynchronousMessageStoreRec
                         {
                             final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
 
-                            entry.acquire();
-
-                            branch.dequeue(entry.getEnqueueRecord());
-
-                            branch.addPostTransactionAction(new ServerTransaction.Action()
+                            if (entry.acquire())
                             {
+                                branch.dequeue(entry.getEnqueueRecord());
 
-                                public void postCommit()
+                                branch.addPostTransactionAction(new ServerTransaction.Action()
                                 {
-                                    entry.delete();
-                                }
 
-                                public void onRollback()
-                                {
-                                    entry.release();
-                                }
-                            });
+                                    public void postCommit()
+                                    {
+                                        entry.delete();
+                                    }
+
+                                    public void onRollback()
+                                    {
+                                        entry.release();
+                                    }
+                                });
+                            }
+                            else
+                            {
+                                // Should never happen - dtx recovery is always synchronous and occurs before
+                                // any other message actors are allowed to act on the virtualhost.
+                                throw new ServerScopedRuntimeException(
+                                        "Distributed transaction dequeue handler failed to acquire " + entry +
+                                        " during recovery of queue " + queue);
+
+                            }
                         }
                         else
                         {

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java Mon Apr 11 12:27:07 2016
@@ -53,6 +53,7 @@ import org.apache.qpid.server.txn.DtxBra
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.transport.Xid;
 import org.apache.qpid.transport.util.Functions;
 
@@ -331,23 +332,33 @@ public class SynchronousMessageStoreReco
                     {
                         final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
 
-                        entry.acquire();
-
-                        branch.dequeue(entry.getEnqueueRecord());
-
-                        branch.addPostTransactionAction(new ServerTransaction.Action()
+                        if (entry.acquire())
                         {
+                            branch.dequeue(entry.getEnqueueRecord());
 
-                            public void postCommit()
+                            branch.addPostTransactionAction(new ServerTransaction.Action()
                             {
-                                entry.delete();
-                            }
 
-                            public void onRollback()
-                            {
-                                entry.release();
-                            }
-                        });
+                                public void postCommit()
+                                {
+                                    entry.delete();
+                                }
+
+                                public void onRollback()
+                                {
+                                    entry.release();
+                                }
+                            });
+                        }
+                        else
+                        {
+                            // Should never happen - dtx recovery is always synchronous and occurs before
+                            // any other message actors are allowed to act on the virtualhost.
+                            throw new ServerScopedRuntimeException(
+                                    "Distributed transaction dequeue handler failed to acquire " + entry +
+                                    " during recovery of queue " + queue);
+                        }
+
                     }
                     else
                     {

Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Mon Apr 11 12:27:07 2016
@@ -101,7 +101,7 @@ public class MockMessageInstance impleme
     }
 
     @Override
-    public boolean lockAcquisition()
+    public boolean lockAcquisition(final ConsumerImpl consumer)
     {
         return false;
     }
@@ -157,6 +157,11 @@ public class MockMessageInstance impleme
     {
     }
 
+    @Override
+    public void release(final ConsumerImpl release)
+    {
+    }
+
     @Override
     public boolean resend()
     {

Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Mon Apr 11 12:27:07 2016
@@ -170,13 +170,33 @@ public abstract class QueueEntryImplTest
 
         assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
         assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition());
-        assertTrue("Should be able to unlock locked queue entry",_queueEntry.lockAcquisition());
+        assertTrue("Should be able to lock queue entry",_queueEntry.lockAcquisition(consumer));
         assertFalse("Acquisition should not be able to be hijacked when locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
 
         _queueEntry.delete();
         assertTrue("Locked queue entry should be able to be deleted", _queueEntry.isDeleted());
     }
 
+    public void testLockAcquisitionOwnership()
+    {
+        QueueConsumer consumer1 = newConsumer();
+        QueueConsumer consumer2 = newConsumer();
+
+        _queueEntry.acquire(consumer1);
+        assertTrue("Queue entry should be acquired by consumer1", _queueEntry.acquiredByConsumer());
+
+        assertTrue("Consumer1 relocking should be allowed", _queueEntry.lockAcquisition(consumer1));
+        assertFalse("Consumer2 should not be allowed", _queueEntry.lockAcquisition(consumer2));
+
+        _queueEntry.unlockAcquisition();
+
+        assertTrue("Queue entry should still be acquired by consumer1", _queueEntry.acquiredByConsumer());
+
+        _queueEntry.release(consumer1);
+
+        assertFalse("Queue entry should no longer be acquired by consumer1", _queueEntry.acquiredByConsumer());
+    }
+
     /**
      * A helper method to get entry state
      *

Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Mon Apr 11 12:27:07 2016
@@ -341,7 +341,7 @@ public class StandardQueueTest extends A
         }
 
         @Override
-        public boolean lockAcquisition()
+        public boolean lockAcquisition(final ConsumerImpl consumer)
         {
             return true;
         }

Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java Mon Apr 11 12:27:07 2016
@@ -311,8 +311,10 @@ public class SynchronousMessageStoreReco
         Transaction.DequeueRecord dequeueRecord = createMockDequeueRecord(queueId, messageId);
 
         QueueEntry queueEntry = mock(QueueEntry.class);
+        when(queueEntry.acquire()).thenReturn(true);
         when(queue.getMessageOnTheQueue(messageId)).thenReturn(queueEntry);
 
+
         final long format = 1;
         final byte[] globalId = new byte[] {0};
         final byte[] branchId = new byte[] {0};

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Mon Apr 11 12:27:07 2016
@@ -402,13 +402,12 @@ public class ConsumerTarget_0_10 extends
                            });
    }
 
-    void reject(final MessageInstance entry)
+    void reject(final ConsumerImpl consumer, final MessageInstance entry)
     {
         entry.setRedelivered();
-        entry.routeToAlternate(null, null);
-        if(isAcquiredByConsumer(entry))
+        if (entry.lockAcquisition(consumer))
         {
-            entry.delete();
+            entry.routeToAlternate(null, null);
         }
     }
 
@@ -423,7 +422,9 @@ public class ConsumerTarget_0_10 extends
         return false;
     }
 
-    void release(final MessageInstance entry, final boolean setRedelivered)
+    void release(final ConsumerImpl consumer,
+                 final MessageInstance entry,
+                 final boolean setRedelivered)
     {
         if (setRedelivered)
         {
@@ -437,29 +438,32 @@ public class ConsumerTarget_0_10 extends
 
         if (isMaxDeliveryLimitReached(entry))
         {
-            sendToDLQOrDiscard(entry);
+            sendToDLQOrDiscard(consumer, entry);
         }
         else
         {
-            entry.release();
+            entry.release(consumer);
         }
     }
 
-    protected void sendToDLQOrDiscard(MessageInstance entry)
+    protected void sendToDLQOrDiscard(final ConsumerImpl consumer, MessageInstance entry)
     {
         final ServerMessage msg = entry.getMessage();
 
-        int requeues = entry.routeToAlternate(new Action<MessageInstance>()
-                    {
-                        @Override
-                        public void performAction(final MessageInstance requeueEntry)
-                        {
-                            getEventLogger().message(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
-                                                                                   requeueEntry.getOwningResource()
-                                                                                           .getName()));
-                        }
-                    }, null);
-
+        int requeues = 0;
+        if (entry.lockAcquisition(consumer))
+        {
+            requeues = entry.routeToAlternate(new Action<MessageInstance>()
+            {
+                @Override
+                public void performAction(final MessageInstance requeueEntry)
+                {
+                    getEventLogger().message(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+                                                                           requeueEntry.getOwningResource()
+                                                                                   .getName()));
+                }
+            }, null);
+        }
         if (requeues == 0)
         {
             TransactionLogResource owningResource = entry.getOwningResource();
@@ -586,20 +590,6 @@ public class ConsumerTarget_0_10 extends
         return _stopped.get();
     }
 
-    public boolean deleteAcquired(MessageInstance entry)
-    {
-        if(isAcquiredByConsumer(entry))
-        {
-            acquisitionRemoved(entry);
-            entry.delete();
-            return true;
-        }
-        else
-        {
-            return false;
-        }
-    }
-
     @Override
     public void acquisitionRemoved(final MessageInstance entry)
     {

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Mon Apr 11 12:27:07 2016
@@ -47,40 +47,17 @@ class ExplicitAcceptDispositionChangeLis
 
     public void onAccept()
     {
-        if(_target != null && _entry.isAcquiredBy(_consumer) && _entry.lockAcquisition())
-        {
-            _target.getSessionModel().acknowledge(_target, _entry);
-        }
-        else
-        {
-            _logger.debug("MessageAccept received for message which is not been acquired - message may have expired or been removed");
-        }
-
+        _target.getSessionModel().acknowledge(_consumer, _target, _entry);
     }
 
     public void onRelease(boolean setRedelivered)
     {
-        if(_target != null && _entry.isAcquiredBy(_consumer))
-        {
-            _target.release(_entry, setRedelivered);
-        }
-        else
-        {
-            _logger.debug("MessageRelease received for message which has not been acquired - message may have expired or been removed");
-        }
+        _target.release(_consumer, _entry, setRedelivered);
     }
 
     public void onReject()
     {
-        if(_target != null && _entry.isAcquiredBy(_consumer))
-        {
-            _target.reject(_entry);
-        }
-        else
-        {
-            _logger.debug("MessageReject received for message which has not been acquired - message may have expired or been removed");
-        }
-
+        _target.reject(_consumer, _entry);
     }
 
     public boolean acquire()

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java Mon Apr 11 12:27:07 2016
@@ -51,27 +51,13 @@ class ImplicitAcceptDispositionChangeLis
 
     public void onRelease(boolean setRedelivered)
     {
-        if(_entry.isAcquiredBy(_consumer))
-        {
-            _target.release(_entry, setRedelivered);
-        }
-        else
-        {
-            _logger.warn("MessageRelease received for message which has not been acquired (likely client error)");
-        }
+        _target.release(_consumer, _entry, setRedelivered);
+
     }
 
     public void onReject()
     {
-        if(_entry.isAcquiredBy(_consumer))
-        {
-            _target.reject(_entry);
-        }
-        else
-        {
-            _logger.warn("MessageReject received for message which has not been acquired (likely client error)");
-        }
-
+        _target.reject(_consumer, _entry);
     }
 
     public boolean acquire()

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Mon Apr 11 12:27:07 2016
@@ -58,10 +58,7 @@ public class MessageAcceptCompletionList
         {
             _sub.getCreditManager().restoreCredit(1l, _messageSize);
         }
-        if(_entry.isAcquiredBy(_consumer) && _entry.lockAcquisition())
-        {
-            _session.acknowledge(_sub, _entry);
-        }
+        _session.acknowledge(_consumer, _sub, _entry);
 
         _session.removeDispositionListener(method);
     }

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Mon Apr 11 12:27:07 2016
@@ -75,7 +75,6 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.protocol.ConsumerListener;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.security.*;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.transport.AMQPConnection;
@@ -524,25 +523,31 @@ public class ServerSession extends Sessi
         // Broker shouldn't block awaiting close - thus do override this method to do nothing
     }
 
-    public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry)
+    public void acknowledge(final ConsumerImpl consumer,
+                            final ConsumerTarget_0_10 target,
+                            final MessageInstance entry)
     {
-        _transaction.dequeue(entry.getEnqueueRecord(),
-                             new ServerTransaction.Action()
-                             {
-
-                                 public void postCommit()
+        if (entry.lockAcquisition(consumer))
+        {
+            _transaction.dequeue(entry.getEnqueueRecord(),
+                                 new ServerTransaction.Action()
                                  {
-                                     sub.deleteAcquired(entry);
-                                 }
 
-                                 public void onRollback()
-                                 {
-                                     // The client has acknowledge the message and therefore have seen it.
-                                     // In the event of rollback, the message must be marked as redelivered.
-                                     entry.setRedelivered();
-                                     entry.release();
-                                 }
-                             });
+                                     public void postCommit()
+                                     {
+                                         target.acquisitionRemoved(entry);
+                                         entry.delete();
+                                     }
+
+                                     public void onRollback()
+                                     {
+                                         // The client has acknowledge the message and therefore have seen it.
+                                         // In the event of rollback, the message must be marked as redelivered.
+                                         entry.setRedelivered();
+                                         entry.release(consumer);
+                                     }
+                                 });
+        }
     }
 
     Collection<ConsumerTarget_0_10> getSubscriptions()

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Apr 11 12:27:07 2016
@@ -953,7 +953,7 @@ public class AMQChannel
      * this same channel or to other subscribers.
      *
      */
-    public void requeue()
+    private void requeue()
     {
         // we must create a new map since all the messages will get a new delivery tag when they are redelivered
         Collection<MessageInstance> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
@@ -973,7 +973,7 @@ public class AMQChannel
             unacked.setRedelivered();
 
             // Ensure message is released for redelivery
-            unacked.release();
+            unacked.release(unacked.getAcquiringConsumer());
         }
 
     }
@@ -994,8 +994,7 @@ public class AMQChannel
             unacked.setRedelivered();
 
             // Ensure message is released for redelivery
-            unacked.release();
-
+            unacked.release(unacked.getAcquiringConsumer());
         }
         else
         {
@@ -1035,7 +1034,7 @@ public class AMQChannel
      * Called to resend all outstanding unacknowledged messages to this same channel.
      *
      */
-    public void resend()
+    private void resend()
     {
 
 
@@ -1108,7 +1107,7 @@ public class AMQChannel
             _unacknowledgedMessageMap.remove(deliveryTag);
 
             message.setRedelivered();
-            message.release();
+            message.release(message.getAcquiringConsumer());
 
         }
     }
@@ -1122,7 +1121,7 @@ public class AMQChannel
      *                    acknowledges the single message specified by the delivery tag
      *
      */
-    public void acknowledgeMessage(long deliveryTag, boolean multiple)
+    private void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
         Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple);
         _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
@@ -1255,7 +1254,7 @@ public class AMQChannel
         _uncommittedMessages.clear();
     }
 
-    public void rollback(Runnable postRollbackTask)
+    private void rollback(Runnable postRollbackTask)
     {
 
         // stop all subscriptions
@@ -1284,10 +1283,10 @@ public class AMQChannel
 
         for(MessageInstance entry : _resendList)
         {
-            ConsumerImpl sub = entry.getDeliveredConsumer();
-            if(sub == null || sub.isClosed())
+            ConsumerImpl sub = entry.getAcquiringConsumer();
+            if (sub == null || sub.isClosed())
             {
-                entry.release();
+                entry.release(sub);
             }
             else
             {
@@ -1627,7 +1626,7 @@ public class AMQChannel
                 {
                     for(MessageInstance entry : _ackedMessages)
                     {
-                        entry.release();
+                        entry.release(entry.getAcquiringConsumer());
                     }
                 }
                 finally
@@ -1768,7 +1767,7 @@ public class AMQChannel
         _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose);
     }
 
-    public void deadLetter(long deliveryTag)
+    private void deadLetter(long deliveryTag)
     {
         final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
         final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag);
@@ -1780,9 +1779,10 @@ public class AMQChannel
         else
         {
             final ServerMessage msg = rejectedQueueEntry.getMessage();
-
-
-            int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
+            int requeues = 0;
+            if (rejectedQueueEntry.lockAcquisition(rejectedQueueEntry.getAcquiringConsumer()))
+            {
+                requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
                 {
                     @Override
                     public void performAction(final MessageInstance requeueEntry)
@@ -1793,6 +1793,7 @@ public class AMQChannel
                                                                                                         .getName()));
                     }
                 }, null);
+            }
 
             if(requeues == 0)
             {

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java Mon Apr 11 12:27:07 2016
@@ -62,10 +62,6 @@ public class ExtractResendAndRequeue imp
                 _msgToRequeue.put(deliveryTag, message);
             }
         }
-        else
-        {
-            _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
-        }
 
         // false means continue processing
         return false;

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Mon Apr 11 12:27:07 2016
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -156,7 +155,7 @@ public class UnacknowledgedMessageMapImp
             List<MessageInstance> acknowledged = new ArrayList<>();
             for (MessageInstance instance : ackedMessageMap.values())
             {
-                if (instance.lockAcquisition())
+                if (instance.lockAcquisition(instance.getAcquiringConsumer()))
                 {
                     acknowledged.add(instance);
                 }
@@ -170,7 +169,7 @@ public class UnacknowledgedMessageMapImp
             {
                 instance = remove(deliveryTag);
             }
-            if(instance != null && instance.lockAcquisition())
+            if(instance != null && instance.lockAcquisition(instance.getAcquiringConsumer()))
             {
                 return Collections.singleton(instance);
             }

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java Mon Apr 11 12:27:07 2016
@@ -25,11 +25,14 @@ import static org.mockito.Mockito.when;
 
 import java.util.Collection;
 
+import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class UnacknowledgedMessageMapTest extends QpidTestCase
 {
+    private final ConsumerImpl _consumer = mock(ConsumerImpl.class);
+
     public void testDeletedMessagesCantBeAcknowledged()
     {
         UnacknowledgedMessageMap map = new UnacknowledgedMessageMapImpl(100);
@@ -47,8 +50,8 @@ public class UnacknowledgedMessageMapTes
         map = new UnacknowledgedMessageMapImpl(100);
         msgs = populateMap(map,expectedSize);
         // simulate some messages being ttl expired
-        when(msgs[2].lockAcquisition()).thenReturn(Boolean.FALSE);
-        when(msgs[4].lockAcquisition()).thenReturn(Boolean.FALSE);
+        when(msgs[2].lockAcquisition(_consumer)).thenReturn(Boolean.FALSE);
+        when(msgs[4].lockAcquisition(_consumer)).thenReturn(Boolean.FALSE);
 
         assertEquals(expectedSize,map.size());
 
@@ -77,7 +80,8 @@ public class UnacknowledgedMessageMapTes
     private MessageInstance createMessageInstance(final int id)
     {
         MessageInstance instance = mock(MessageInstance.class);
-        when(instance.lockAcquisition()).thenReturn(Boolean.TRUE);
+        when(instance.lockAcquisition(_consumer)).thenReturn(Boolean.TRUE);
+        when(instance.getAcquiringConsumer()).thenReturn(_consumer);
         return instance;
     }
 }

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Mon Apr 11 12:27:07 2016
@@ -240,13 +240,8 @@ class ConsumerTarget_1_0 extends Abstrac
 
                                 public void onRollback()
                                 {
-                                    if(entry.isAcquiredBy(getConsumer()))
-                                    {
-                                        entry.release();
-                                        _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
-
-
-                                    }
+                                    entry.release(getConsumer());
+                                    _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
                                 }
                             });
                         }
@@ -257,7 +252,7 @@ class ConsumerTarget_1_0 extends Abstrac
                 }
                 else
                 {
-                    entry.release();
+                    entry.release(getConsumer());
                 }
             }
         }
@@ -393,24 +388,26 @@ class ConsumerTarget_1_0 extends Abstrac
 
             if(outcome instanceof Accepted)
             {
-                _queueEntry.lockAcquisition();
-                txn.dequeue(_queueEntry.getEnqueueRecord(),
-                        new ServerTransaction.Action()
-                        {
-
-                            public void postCommit()
-                            {
-                                if(_queueEntry.isAcquiredBy(getConsumer()))
+                if (_queueEntry.lockAcquisition(getConsumer()))
+                {
+                    txn.dequeue(_queueEntry.getEnqueueRecord(),
+                                new ServerTransaction.Action()
                                 {
-                                    _queueEntry.delete();
-                                }
-                            }
 
-                            public void onRollback()
-                            {
+                                    public void postCommit()
+                                    {
+                                        if (_queueEntry.isAcquiredBy(getConsumer()))
+                                        {
+                                            _queueEntry.delete();
+                                        }
+                                    }
 
-                            }
-                        });
+                                    public void onRollback()
+                                    {
+
+                                    }
+                                });
+                }
                 txn.addPostTransactionAction(new ServerTransaction.Action()
                     {
                         public void postCommit()
@@ -429,7 +426,7 @@ class ConsumerTarget_1_0 extends Abstrac
                                 _link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
                                 _link.getEndpoint().sendFlowConditional();
                                 _queueEntry.incrementDeliveryCount();
-                                _queueEntry.release();
+                                _queueEntry.release(getConsumer());
                             }
                         }
                     });
@@ -441,7 +438,7 @@ class ConsumerTarget_1_0 extends Abstrac
                     public void postCommit()
                     {
 
-                        _queueEntry.release();
+                        _queueEntry.release(getConsumer());
                         _link.getEndpoint().settle(_deliveryTag);
                     }
 
@@ -459,7 +456,7 @@ class ConsumerTarget_1_0 extends Abstrac
                     public void postCommit()
                     {
 
-                        _queueEntry.release();
+                        _queueEntry.release(getConsumer());
                         if(Boolean.TRUE.equals(((Modified)outcome).getDeliveryFailed()))
                         {
                             _queueEntry.incrementDeliveryCount();

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Mon Apr 11 12:27:07 2016
@@ -621,7 +621,7 @@ public class SendingLink_1_0 implements
             if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
             {
                 queueEntry.setRedelivered();
-                queueEntry.release();
+                queueEntry.release(_consumer);
                 _unsettledMap.remove(deliveryTag);
             }
             else if(initialUnsettledMap.get(deliveryTag) instanceof Outcome)
@@ -660,12 +660,11 @@ public class SendingLink_1_0 implements
                                 {
                                     public void postCommit()
                                     {
-                                        queueEntry.release();
+                                        queueEntry.release(_consumer);
                                     }
 
                                     public void onRollback()
                                     {
-                                        //To change body of implemented methods use File | Settings | File Templates.
                                     }
                                 });
                     }

Modified: qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Mon Apr 11 12:27:07 2016
@@ -1118,7 +1118,7 @@ class ManagementNode implements MessageS
         }
 
         @Override
-        public boolean lockAcquisition()
+        public boolean lockAcquisition(final ConsumerImpl consumer)
         {
             return false;
         }
@@ -1172,6 +1172,12 @@ class ManagementNode implements MessageS
         {
 
         }
+
+        @Override
+        public void release(final ConsumerImpl release)
+        {
+
+        }
 
         @Override
         public boolean resend()

Modified: qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java Mon Apr 11 12:27:07 2016
@@ -157,7 +157,7 @@ class ManagementResponse implements Mess
     }
 
     @Override
-    public boolean lockAcquisition()
+    public boolean lockAcquisition(final ConsumerImpl consumer)
     {
         return false;
     }
@@ -213,6 +213,12 @@ class ManagementResponse implements Mess
     }
 
     @Override
+    public void release(final ConsumerImpl release)
+    {
+        release();
+    }
+
+    @Override
     public boolean resend()
     {
         return false;

Copied: qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java (from r1737804, qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java)
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java?p2=qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java&p1=qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java&r1=1737804&r2=1738576&rev=1738576&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java (original)
+++ qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java Mon Apr 11 12:27:07 2016
@@ -35,6 +35,10 @@ import javax.jms.Session;
 
 import org.apache.qpid.client.RejectBehaviour;
 import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.server.management.plugin.HttpManagement;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Plugin;
+import org.apache.qpid.server.model.Port;
 import org.apache.qpid.systest.rest.RestTestHelper;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 import org.apache.qpid.test.utils.TestBrokerConfiguration;
@@ -47,7 +51,21 @@ public class LiveQueueOperationsTest ext
     @Override
     protected void setUp() throws Exception
     {
-        getDefaultBrokerConfiguration().addHttpManagementConfiguration();
+        TestBrokerConfiguration config = getBrokerConfiguration();
+        config.addHttpManagementConfiguration();
+        config.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.PORT, getHttpManagementPort(getPort()));
+        config.removeObjectConfiguration(Port.class, TestBrokerConfiguration.ENTRY_NAME_JMX_PORT);
+        config.removeObjectConfiguration(Port.class, TestBrokerConfiguration.ENTRY_NAME_RMI_PORT);
+
+        config.setObjectAttribute(AuthenticationProvider.class, TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER,
+                                  "secureOnlyMechanisms",
+                                  "{}");
+
+        // set password authentication provider on http port for the tests
+        config.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.AUTHENTICATION_PROVIDER,
+                                  TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER);
+        config.setObjectAttribute(Plugin.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_MANAGEMENT, HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, true);
+
         setTestSystemProperty("queue.deadLetterQueueEnabled","true");
         setTestSystemProperty("queue.maximumDeliveryAttempts", String.valueOf(MAX_DELIVERY_COUNT));
 
@@ -57,14 +75,15 @@ public class LiveQueueOperationsTest ext
         {
             setTestClientSystemProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.SERVER.toString());
         }
-
         super.setUp();
+
     }
 
     public void testClearQueueOperationWithActiveConsumerDlqAll() throws Exception
     {
         final String virtualHostName = TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST;
-        RestTestHelper restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
+        RestTestHelper restTestHelper = new RestTestHelper(getHttpManagementPort(getPort()));
+        restTestHelper.setUsernameAndPassword("webadmin", "webadmin");
 
         Connection conn = getConnection();
         conn.start();

Modified: qpid/java/branches/6.0.x/test-profiles/CPPExcludes
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/test-profiles/CPPExcludes?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/test-profiles/CPPExcludes (original)
+++ qpid/java/branches/6.0.x/test-profiles/CPPExcludes Mon Apr 11 12:27:07 2016
@@ -109,6 +109,7 @@ org.apache.qpid.systest.management.jmx.*
 
 // JMX is used in this test for validation
 org.apache.qpid.server.queue.ModelTest#*
+org.apache.qpid.server.queue.LiveQueueOperationsTest#*
 
 // 0-10 is not supported by the MethodRegistry
 org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*

Propchange: qpid/java/branches/6.0.x/test-profiles/CPPExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Apr 11 12:27:07 2016
@@ -6,4 +6,4 @@
 /qpid/branches/java-broker-vhost-refactor/java/test-profiles/CPPExcludes:1493674-1494547
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes:805429-821809
 /qpid/branches/qpid-2935/qpid/java/test-profiles/CPPExcludes:1061302-1072333
-/qpid/java/trunk/test-profiles/CPPExcludes:1715446,1732812,1736751,1736838
+/qpid/java/trunk/test-profiles/CPPExcludes:1715446,1732812,1736751,1736838,1737804



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org