You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/09/09 14:49:41 UTC

svn commit: r1760032 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/main/java/org/apach...

Author: kwall
Date: Fri Sep  9 14:49:41 2016
New Revision: 1760032

URL: http://svn.apache.org/viewvc?rev=1760032&view=rev
Log:
QPID-7417: [Java Broker] Ensure message instance listeners only fire on state change of the associated object

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Fri Sep  9 14:49:41 2016
@@ -226,7 +226,7 @@ public abstract class AbstractConsumerTa
 
                 if (consumer.acquires())
                 {
-                    entry.unlockAcquisition();
+                    entry.makeAcquisitionStealable();
                 }
             }
             finally

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Fri Sep  9 14:49:41 2016
@@ -44,9 +44,9 @@ public interface MessageInstance
 
     void decrementDeliveryCount();
 
-    void addStateChangeListener(StateChangeListener<? super MessageInstance,State> listener);
+    void addStateChangeListener(StateChangeListener<? super MessageInstance, EntryState> listener);
 
-    boolean removeStateChangeListener(StateChangeListener<? super MessageInstance, State> listener);
+    boolean removeStateChangeListener(StateChangeListener<? super MessageInstance, EntryState> listener);
 
     boolean acquiredByConsumer();
 
@@ -70,9 +70,9 @@ public interface MessageInstance
 
     boolean acquire(ConsumerImpl sub);
 
-    boolean lockAcquisition(final ConsumerImpl consumer);
+    boolean makeAcquisitionUnstealable(final ConsumerImpl consumer);
 
-    boolean unlockAcquisition();
+    boolean makeAcquisitionStealable();
 
     int getMaximumDeliveryCount();
 
@@ -84,7 +84,7 @@ public interface MessageInstance
 
     MessageEnqueueRecord getEnqueueRecord();
 
-    public static enum State
+    enum State
     {
         AVAILABLE,
         ACQUIRED,
@@ -92,7 +92,7 @@ public interface MessageInstance
         DELETED
     }
 
-    public abstract class EntryState
+    abstract class EntryState
     {
         protected EntryState()
         {
@@ -114,7 +114,7 @@ public interface MessageInstance
     }
 
 
-    public final class AvailableState extends EntryState
+    final class AvailableState extends EntryState
     {
 
         public State getState()
@@ -129,7 +129,7 @@ public interface MessageInstance
     }
 
 
-    public final class DequeuedState extends EntryState
+    final class DequeuedState extends EntryState
     {
 
         public State getState()
@@ -144,7 +144,7 @@ public interface MessageInstance
     }
 
 
-    public final class DeletedState extends EntryState
+    final class DeletedState extends EntryState
     {
 
         public State getState()
@@ -158,7 +158,7 @@ public interface MessageInstance
         }
     }
 
-    public final class NonConsumerAcquiredState extends EntryState
+    final class NonConsumerAcquiredState extends EntryState
     {
         public State getState()
         {
@@ -171,76 +171,72 @@ public interface MessageInstance
         }
     }
 
-    public final class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState
+    abstract class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState
     {
-        private final C _consumer;
-        private final LockedAcquiredState<C> _lockedState;
+        public abstract C getConsumer();
 
-        public ConsumerAcquiredState(C consumer)
+        @Override
+        public final State getState()
         {
-            _consumer = consumer;
-            _lockedState = new LockedAcquiredState<>(this);
+            return State.ACQUIRED;
         }
 
-
-        public State getState()
+        @Override
+        public String toString()
         {
-            return State.ACQUIRED;
+            return "{" + getState().name() + " : " + getConsumer() +"}";
         }
+    }
 
-        public C getConsumer()
+    final class StealableConsumerAcquiredState<C extends ConsumerImpl> extends ConsumerAcquiredState
+    {
+        private final C _consumer;
+        private final UnstealableConsumerAcquiredState<C> _unstealableState;
+
+        public StealableConsumerAcquiredState(C consumer)
         {
-            return _consumer;
+            _consumer = consumer;
+            _unstealableState = new UnstealableConsumerAcquiredState<>(this);
         }
 
-        public String toString()
+        @Override
+        public C getConsumer()
         {
-            return "{" + getState().name() + " : " + _consumer +"}";
+            return _consumer;
         }
 
-        public LockedAcquiredState<C> getLockedState()
+        public UnstealableConsumerAcquiredState<C> getUnstealableState()
         {
-            return _lockedState;
+            return _unstealableState;
         }
-
     }
 
-    public final class LockedAcquiredState<C extends ConsumerImpl> extends EntryState
+    final class UnstealableConsumerAcquiredState<C extends ConsumerImpl> extends ConsumerAcquiredState
     {
-        private final ConsumerAcquiredState<C> _acquiredState;
+        private final StealableConsumerAcquiredState<C> _stealableState;
 
-        public LockedAcquiredState(final ConsumerAcquiredState<C> acquiredState)
+        public UnstealableConsumerAcquiredState(final StealableConsumerAcquiredState<C> stealableState)
         {
-            _acquiredState = acquiredState;
+            _stealableState = stealableState;
         }
 
         @Override
-        public State getState()
-        {
-            return State.ACQUIRED;
-        }
-
         public C getConsumer()
         {
-            return _acquiredState.getConsumer();
-        }
-
-        public String toString()
-        {
-            return "{" + getState().name() + " : " + _acquiredState.getConsumer() +"}";
+            return _stealableState.getConsumer();
         }
 
-        public ConsumerAcquiredState<C> getUnlockedState()
+        public StealableConsumerAcquiredState<C> getStealableState()
         {
-            return _acquiredState;
+            return _stealableState;
         }
     }
 
 
-    final static EntryState AVAILABLE_STATE = new AvailableState();
-    final static EntryState DELETED_STATE = new DeletedState();
-    final static EntryState DEQUEUED_STATE = new DequeuedState();
-    final static EntryState NON_CONSUMER_ACQUIRED_STATE = new NonConsumerAcquiredState();
+    EntryState AVAILABLE_STATE = new AvailableState();
+    EntryState DELETED_STATE = new DeletedState();
+    EntryState DEQUEUED_STATE = new DequeuedState();
+    EntryState NON_CONSUMER_ACQUIRED_STATE = new NonConsumerAcquiredState();
 
     boolean isAvailable();
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java Fri Sep  9 14:49:41 2016
@@ -22,6 +22,8 @@ package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
+import org.apache.qpid.server.message.MessageInstance.EntryState;
 import org.apache.qpid.server.util.StateChangeListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -172,7 +174,7 @@ public class DefinedGroupMessageGroupMan
 
             _groupMap.put(groupId, group);
 
-            // there's a small change that the group became empty between the point at which getNextAvailable() was
+            // there's a small chance that the group became empty between the point at which getNextAvailable() was
             // called on the consumer, and when accept message is called... in that case we want to avoid delivering
             // out of order
             if(_resetHelper.isEntryAheadOfConsumer(entry, sub))
@@ -256,7 +258,7 @@ public class DefinedGroupMessageGroupMan
         return groupVal;
     }
 
-    private class GroupStateChangeListener implements StateChangeListener<MessageInstance, MessageInstance.State>
+    private class GroupStateChangeListener implements StateChangeListener<MessageInstance, EntryState>
     {
         private final Group _group;
 
@@ -265,24 +267,20 @@ public class DefinedGroupMessageGroupMan
             _group = group;
         }
 
-        public void stateChanged(final MessageInstance entry,
-                                 final MessageInstance.State oldState,
-                                 final MessageInstance.State newState)
+        @Override
+        public void stateChanged(final MessageInstance entry, final EntryState oldState, final EntryState newState)
         {
             synchronized (DefinedGroupMessageGroupManager.this)
             {
                 if(_group.isValid())
                 {
-                    if(oldState != newState)
+                    if (isConsumerAcquiredStateForThisGroup(newState) && !isConsumerAcquiredStateForThisGroup(oldState))
+                    {
+                        _group.add();
+                    }
+                    else if (isConsumerAcquiredStateForThisGroup(oldState) && !isConsumerAcquiredStateForThisGroup(newState))
                     {
-                        if(newState == QueueEntry.State.ACQUIRED)
-                        {
-                            _group.add();
-                        }
-                        else if(oldState == QueueEntry.State.ACQUIRED)
-                        {
-                            _group.subtract((QueueEntry) entry, newState == MessageInstance.State.AVAILABLE);
-                        }
+                        _group.subtract((QueueEntry) entry, newState.getState() == MessageInstance.State.AVAILABLE);
                     }
                 }
                 else
@@ -291,5 +289,11 @@ public class DefinedGroupMessageGroupMan
                 }
             }
         }
+
+        private boolean isConsumerAcquiredStateForThisGroup(EntryState state)
+        {
+            return state instanceof ConsumerAcquiredState
+                   && ((ConsumerAcquiredState) state).getConsumer() == _group.getConsumer();
+        }
     }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Fri Sep  9 14:49:41 2016
@@ -46,7 +46,7 @@ public interface QueueConsumer<X extends
 
     boolean resend(QueueEntry e);
 
-    MessageInstance.ConsumerAcquiredState<X> getOwningState();
+    MessageInstance.StealableConsumerAcquiredState<X> getOwningState();
 
     QueueContext getQueueContext();
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Fri Sep  9 14:49:41 2016
@@ -71,7 +71,8 @@ class QueueConsumerImpl
     private final AtomicBoolean _closed = new AtomicBoolean(false);
     private final long _consumerNumber;
     private final long _createTime = System.currentTimeMillis();
-    private final MessageInstance.ConsumerAcquiredState<QueueConsumerImpl> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumerImpl>(this);
+    private final MessageInstance.StealableConsumerAcquiredState<QueueConsumerImpl>
+            _owningState = new MessageInstance.StealableConsumerAcquiredState<>(this);
     private final WaitingOnCreditMessageListener _waitingOnCreditMessageListener = new WaitingOnCreditMessageListener();
     private final boolean _acquires;
     private final boolean _seesRequeues;
@@ -538,7 +539,7 @@ class QueueConsumerImpl
         return _createTime;
     }
 
-    public final MessageInstance.ConsumerAcquiredState<QueueConsumerImpl> getOwningState()
+    public final MessageInstance.StealableConsumerAcquiredState<QueueConsumerImpl> getOwningState()
     {
         return _owningState;
     }
@@ -644,7 +645,7 @@ class QueueConsumerImpl
         return _queue.getEventLogger();
     }
 
-    public class WaitingOnCreditMessageListener implements StateChangeListener<MessageInstance, QueueEntry.State>
+    public class WaitingOnCreditMessageListener implements StateChangeListener<MessageInstance, MessageInstance.EntryState>
     {
         private final AtomicReference<MessageInstance> _entry = new AtomicReference<>();
 
@@ -675,7 +676,8 @@ class QueueConsumerImpl
 
         }
 
-        public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState)
+        @Override
+        public void stateChanged(MessageInstance entry, MessageInstance.EntryState oldState, MessageInstance.EntryState newState)
         {
             entry.removeStateChangeListener(this);
             _entry.compareAndSet(entry, null);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Sep  9 14:49:41 2016
@@ -81,7 +81,7 @@ public abstract class QueueEntryImpl imp
         (QueueEntryImpl.class, EntryState.class, "_state");
 
 
-    private volatile StateChangeListenerEntry<? super QueueEntry, State> _stateChangeListeners;
+    private volatile StateChangeListenerEntry<? super QueueEntry, EntryState> _stateChangeListeners;
 
     private static final
         AtomicReferenceFieldUpdater<QueueEntryImpl, StateChangeListenerEntry>
@@ -224,7 +224,7 @@ public abstract class QueueEntryImpl imp
     {
         return acquire(NON_CONSUMER_ACQUIRED_STATE);
     }
-    private class DelayedAcquisitionStateListener implements StateChangeListener<MessageInstance, State>
+    private class DelayedAcquisitionStateListener implements StateChangeListener<MessageInstance, EntryState>
     {
         private final Runnable _task;
         private final AtomicBoolean _run = new AtomicBoolean();
@@ -235,13 +235,13 @@ public abstract class QueueEntryImpl imp
         }
 
         @Override
-        public void stateChanged(final MessageInstance object, final State oldState, final State newState)
+        public void stateChanged(final MessageInstance object, final EntryState oldState, final EntryState newState)
         {
-            if(newState == State.DELETED || newState == State.DEQUEUED)
+            if (newState.equals(DELETED_STATE) || newState.equals(DEQUEUED_STATE))
             {
                 QueueEntryImpl.this.removeStateChangeListener(this);
             }
-            else if(acquireOrSteal(null))
+            else if (acquireOrSteal(null))
             {
                 runTask();
             }
@@ -288,7 +288,7 @@ public abstract class QueueEntryImpl imp
 
         EntryState currentState;
 
-        while((currentState = _state).getState() == State.AVAILABLE)
+        while((currentState = _state).equals(AVAILABLE_STATE))
         {
             if(acquired = _stateUpdater.compareAndSet(this, currentState, state))
             {
@@ -298,7 +298,7 @@ public abstract class QueueEntryImpl imp
 
         if(acquired && _stateChangeListeners != null)
         {
-            notifyStateChange(State.AVAILABLE, State.ACQUIRED);
+            notifyStateChange(AVAILABLE_STATE, state);
         }
 
         return acquired;
@@ -306,7 +306,7 @@ public abstract class QueueEntryImpl imp
 
     public boolean acquire(ConsumerImpl sub)
     {
-        final boolean acquired = acquire(((QueueConsumer<?>) sub).getOwningState().getLockedState());
+        final boolean acquired = acquire(((QueueConsumer<?>) sub).getOwningState().getUnstealableState());
         if(acquired)
         {
             _deliveryCountUpdater.compareAndSet(this,-1,0);
@@ -316,33 +316,35 @@ public abstract class QueueEntryImpl imp
     }
 
     @Override
-    public boolean lockAcquisition(final ConsumerImpl consumer)
+    public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
     {
         EntryState state = _state;
-        if(state instanceof ConsumerAcquiredState && ((ConsumerAcquiredState) state).getConsumer() == consumer)
+        if(state instanceof StealableConsumerAcquiredState
+           && ((StealableConsumerAcquiredState) state).getConsumer() == consumer)
         {
-            LockedAcquiredState lockedState = ((ConsumerAcquiredState) state).getLockedState();
-            boolean updated = _stateUpdater.compareAndSet(this, state, lockedState);
+            UnstealableConsumerAcquiredState unstealableState = ((StealableConsumerAcquiredState) state).getUnstealableState();
+            boolean updated = _stateUpdater.compareAndSet(this, state, unstealableState);
             if(updated)
             {
-                notifyStateChange(state.getState(), lockedState.getState());
+                notifyStateChange(state, unstealableState);
             }
             return updated;
         }
-        return state instanceof LockedAcquiredState && ((LockedAcquiredState) state).getConsumer() == consumer;
+        return state instanceof UnstealableConsumerAcquiredState
+               && ((UnstealableConsumerAcquiredState) state).getConsumer() == consumer;
     }
 
     @Override
-    public boolean unlockAcquisition()
+    public boolean makeAcquisitionStealable()
     {
         EntryState state = _state;
-        if(state instanceof LockedAcquiredState)
+        if(state instanceof UnstealableConsumerAcquiredState)
         {
-            ConsumerAcquiredState unlockedState = ((LockedAcquiredState) state).getUnlockedState();
-            boolean updated = _stateUpdater.compareAndSet(this, state, unlockedState);
+            StealableConsumerAcquiredState stealableState = ((UnstealableConsumerAcquiredState) state).getStealableState();
+            boolean updated = _stateUpdater.compareAndSet(this, state, stealableState);
             if(updated)
             {
-                notifyStateChange(state.getState(),unlockedState.getState());
+                notifyStateChange(state, stealableState);
             }
             return updated;
         }
@@ -351,8 +353,7 @@ public abstract class QueueEntryImpl imp
 
     public boolean acquiredByConsumer()
     {
-
-        return (_state instanceof ConsumerAcquiredState) || (_state instanceof LockedAcquiredState);
+        return _state instanceof ConsumerAcquiredState;
     }
 
     @Override
@@ -360,14 +361,10 @@ public abstract class QueueEntryImpl imp
     {
         ConsumerImpl consumer;
         EntryState state = _state;
-        if(state instanceof ConsumerAcquiredState)
+        if (state instanceof ConsumerAcquiredState)
         {
             consumer = ((ConsumerAcquiredState)state).getConsumer();
         }
-        else if(state instanceof LockedAcquiredState)
-        {
-            consumer = ((LockedAcquiredState)state).getConsumer();
-        }
         else
         {
             consumer = null;
@@ -379,20 +376,22 @@ public abstract class QueueEntryImpl imp
     public boolean isAcquiredBy(ConsumerImpl consumer)
     {
         EntryState state = _state;
-        return (state instanceof ConsumerAcquiredState
-               && ((ConsumerAcquiredState)state).getConsumer() == consumer)
-                || (state instanceof LockedAcquiredState
-                    && ((LockedAcquiredState)state).getConsumer() == consumer);
+        return (state instanceof ConsumerAcquiredState && ((ConsumerAcquiredState)state).getConsumer() == consumer);
     }
 
     @Override
     public boolean removeAcquisitionFromConsumer(ConsumerImpl consumer)
     {
         EntryState state = _state;
-        if(state instanceof ConsumerAcquiredState
-               && ((ConsumerAcquiredState)state).getConsumer() == consumer)
+        if(state instanceof StealableConsumerAcquiredState
+               && ((StealableConsumerAcquiredState)state).getConsumer() == consumer)
         {
-            return _stateUpdater.compareAndSet(this,state,NON_CONSUMER_ACQUIRED_STATE);
+            final boolean stateWasChanged = _stateUpdater.compareAndSet(this, state, NON_CONSUMER_ACQUIRED_STATE);
+            if (stateWasChanged)
+            {
+                notifyStateChange(state, NON_CONSUMER_ACQUIRED_STATE);
+            }
+            return stateWasChanged;
         }
         else
         {
@@ -423,7 +422,7 @@ public abstract class QueueEntryImpl imp
 
     private void postRelease(final EntryState previousState)
     {
-        if(previousState instanceof ConsumerAcquiredState || previousState instanceof LockedAcquiredState)
+        if (previousState instanceof ConsumerAcquiredState)
         {
             getQueue().decrementUnackedMsgCount(this);
         }
@@ -431,9 +430,9 @@ public abstract class QueueEntryImpl imp
         if(!getQueue().isDeleted())
         {
             getQueue().requeue(this);
-            if(_stateChangeListeners != null && previousState.getState() == State.ACQUIRED)
+            if (_stateChangeListeners != null && previousState.getState() == State.ACQUIRED)
             {
-                notifyStateChange(State.ACQUIRED, State.AVAILABLE);
+                notifyStateChange(previousState, AVAILABLE_STATE);
             }
 
         }
@@ -478,19 +477,7 @@ public abstract class QueueEntryImpl imp
     @Override
     public QueueConsumer getDeliveredConsumer()
     {
-        EntryState state = _state;
-        if (state instanceof ConsumerAcquiredState)
-        {
-            return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer();
-        }
-        else if (state instanceof LockedAcquiredState)
-        {
-            return (QueueConsumer) ((LockedAcquiredState) state).getConsumer();
-        }
-        else
-        {
-            return null;
-        }
+        return (QueueConsumer) getAcquiringConsumer();
     }
 
     public void reject()
@@ -536,7 +523,7 @@ public abstract class QueueEntryImpl imp
 
         if(state.getState() == State.ACQUIRED)
         {
-            if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
+            if (state instanceof ConsumerAcquiredState)
             {
                 getQueue().decrementUnackedMsgCount(this);
             }
@@ -544,7 +531,7 @@ public abstract class QueueEntryImpl imp
             getQueue().dequeue(this);
             if(_stateChangeListeners != null)
             {
-                notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
+                notifyStateChange(state, DEQUEUED_STATE);
             }
             return true;
         }
@@ -555,12 +542,12 @@ public abstract class QueueEntryImpl imp
 
     }
 
-    private void notifyStateChange(final State oldState, final State newState)
+    private void notifyStateChange(final EntryState oldState, final EntryState newState)
     {
-        StateChangeListenerEntry<? super QueueEntry, State> entry = _listenersUpdater.get(this);
+        StateChangeListenerEntry<? super QueueEntry, EntryState> entry = _listenersUpdater.get(this);
         while(entry != null)
         {
-            StateChangeListener<? super QueueEntry, State> l = entry.getListener();
+            StateChangeListener<? super QueueEntry, EntryState> l = entry.getListener();
             if(l != null)
             {
                 l.stateChanged(this, oldState, newState);
@@ -651,16 +638,16 @@ public abstract class QueueEntryImpl imp
         return getQueue().isDeleted();
     }
 
-    public void addStateChangeListener(StateChangeListener<? super MessageInstance,State> listener)
+    public void addStateChangeListener(StateChangeListener<? super MessageInstance, EntryState> listener)
     {
-        StateChangeListenerEntry<? super QueueEntry, State> entry = new StateChangeListenerEntry<>(listener);
+        StateChangeListenerEntry<? super QueueEntry, EntryState> entry = new StateChangeListenerEntry<>(listener);
         if(!_listenersUpdater.compareAndSet(this,null, entry))
         {
             _listenersUpdater.get(this).add(entry);
         }
     }
 
-    public boolean removeStateChangeListener(StateChangeListener<? super MessageInstance, State> listener)
+    public boolean removeStateChangeListener(StateChangeListener<? super MessageInstance, EntryState> listener)
     {
         StateChangeListenerEntry entry = _listenersUpdater.get(this);
         if(entry != null)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java Fri Sep  9 14:49:41 2016
@@ -20,7 +20,7 @@
  */
 package org.apache.qpid.server.util;
 
-public interface StateChangeListener<T, E extends Enum>
+public interface StateChangeListener<T, E>
 {
     void stateChanged(T object, E oldState, E newState);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java Fri Sep  9 14:49:41 2016
@@ -22,7 +22,7 @@ package org.apache.qpid.server.util;
 
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-public class StateChangeListenerEntry<T, E extends Enum>
+public class StateChangeListenerEntry<T, E>
 {
     private static final AtomicReferenceFieldUpdater<StateChangeListenerEntry, StateChangeListenerEntry> NEXT =
             AtomicReferenceFieldUpdater.newUpdater(StateChangeListenerEntry.class, StateChangeListenerEntry.class, "_next");

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Fri Sep  9 14:49:41 2016
@@ -356,13 +356,13 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public void addStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+        public void addStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
         {
 
         }
 
         @Override
-        public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+        public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
         {
             return false;
         }
@@ -447,13 +447,13 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public boolean lockAcquisition(final ConsumerImpl consumer)
+        public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
         {
             return false;
         }
 
         @Override
-        public boolean unlockAcquisition()
+        public boolean makeAcquisitionStealable()
         {
             return false;
         }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Fri Sep  9 14:49:41 2016
@@ -412,12 +412,12 @@ abstract class AbstractQueueTestBase ext
         QueueEntry queueEntry = queueEntries.get(0);
 
         final CountDownLatch dequeueIndicator = new CountDownLatch(1);
-        queueEntry.addStateChangeListener(new StateChangeListener<MessageInstance, MessageInstance.State>()
+        queueEntry.addStateChangeListener(new StateChangeListener<MessageInstance, MessageInstance.EntryState>()
         {
             @Override
-            public void stateChanged(MessageInstance object, MessageInstance.State oldState, MessageInstance.State newState)
+            public void stateChanged(MessageInstance object, MessageInstance.EntryState oldState, MessageInstance.EntryState newState)
             {
-                if (newState == MessageInstance.State.DEQUEUED)
+                if (newState.equals(MessageInstance.DEQUEUED_STATE))
                 {
                     dequeueIndicator.countDown();
                 }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java Fri Sep  9 14:49:41 2016
@@ -55,7 +55,8 @@ public class ConsumerListTest extends Qp
     private QueueConsumer newMockConsumer()
     {
         QueueConsumer consumer = mock(QueueConsumer.class);
-        MessageInstance.ConsumerAcquiredState owningState = new QueueEntryImpl.ConsumerAcquiredState(consumer);
+        MessageInstance.StealableConsumerAcquiredState
+                owningState = new MessageInstance.StealableConsumerAcquiredState(consumer);
         when(consumer.getOwningState()).thenReturn(owningState);
 
         return consumer;

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Fri Sep  9 14:49:41 2016
@@ -101,13 +101,13 @@ public class MockMessageInstance impleme
     }
 
     @Override
-    public boolean lockAcquisition(final ConsumerImpl consumer)
+    public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
     {
         return false;
     }
 
     @Override
-    public boolean unlockAcquisition()
+    public boolean makeAcquisitionStealable()
     {
         return false;
     }
@@ -221,13 +221,13 @@ public class MockMessageInstance impleme
     }
 
     @Override
-    public void addStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+    public void addStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
     {
 
     }
 
     @Override
-    public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+    public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
     {
         return false;
     }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Fri Sep  9 14:49:41 2016
@@ -18,8 +18,12 @@
  */
 package org.apache.qpid.server.queue;
 
+import static org.apache.qpid.server.message.MessageInstance.NON_CONSUMER_ACQUIRED_STATE;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.lang.reflect.Field;
@@ -33,6 +37,8 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstance.EntryState;
+import org.apache.qpid.server.message.MessageInstance.StealableConsumerAcquiredState;
+import org.apache.qpid.server.message.MessageInstance.UnstealableConsumerAcquiredState;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.BrokerModel;
@@ -41,6 +47,7 @@ import org.apache.qpid.server.model.Conf
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 /**
@@ -135,12 +142,31 @@ public abstract class QueueEntryImplTest
     {
         final QueueConsumer consumer = mock(QueueConsumer.class);
 
-        MessageInstance.ConsumerAcquiredState owningState = new QueueEntryImpl.ConsumerAcquiredState(consumer);
+        StealableConsumerAcquiredState
+                owningState = new StealableConsumerAcquiredState(consumer);
         when(consumer.getOwningState()).thenReturn(owningState);
         when(consumer.getConsumerNumber()).thenReturn(_consumerId++);
         return consumer;
     }
 
+    public void testStateChanges()
+    {
+        QueueConsumer consumer = newConsumer();
+        StateChangeListener<MessageInstance, EntryState> stateChangeListener = mock(StateChangeListener.class);
+        _queueEntry.addStateChangeListener(stateChangeListener);
+        _queueEntry.acquire(consumer);
+        verify(stateChangeListener).stateChanged(eq(_queueEntry),
+                                                 eq(MessageInstance.AVAILABLE_STATE),
+                                                 isA(UnstealableConsumerAcquiredState.class));
+        _queueEntry.makeAcquisitionStealable();
+        verify(stateChangeListener).stateChanged(eq(_queueEntry),
+                                                 isA(UnstealableConsumerAcquiredState.class),
+                                                 isA(StealableConsumerAcquiredState.class));
+        _queueEntry.removeAcquisitionFromConsumer(consumer);
+        verify(stateChangeListener).stateChanged(eq(_queueEntry),
+                                                 isA(StealableConsumerAcquiredState.class),
+                                                 eq(NON_CONSUMER_ACQUIRED_STATE));
+    }
 
     public void testLocking()
     {
@@ -152,7 +178,7 @@ public abstract class QueueEntryImplTest
                    _queueEntry.isAcquired());
 
         assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
-        assertTrue("Should be able to unlock locked queue entry", _queueEntry.unlockAcquisition());
+        assertTrue("Should be able to unlock locked queue entry", _queueEntry.makeAcquisitionStealable());
         assertFalse("Acquisition should not be able to be removed from the wrong consumer",
                     _queueEntry.removeAcquisitionFromConsumer(consumer2));
         assertTrue("Acquisition should be able to be removed once unlocked",
@@ -169,8 +195,8 @@ public abstract class QueueEntryImplTest
                    _queueEntry.isAcquired());
 
         assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
-        assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition());
-        assertTrue("Should be able to lock queue entry",_queueEntry.lockAcquisition(consumer));
+        assertTrue("Should be able to unlock locked queue entry",_queueEntry.makeAcquisitionStealable());
+        assertTrue("Should be able to lock queue entry",_queueEntry.makeAcquisitionUnstealable(consumer));
         assertFalse("Acquisition should not be able to be hijacked when locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
 
         _queueEntry.delete();
@@ -185,10 +211,10 @@ public abstract class QueueEntryImplTest
         _queueEntry.acquire(consumer1);
         assertTrue("Queue entry should be acquired by consumer1", _queueEntry.acquiredByConsumer());
 
-        assertTrue("Consumer1 relocking should be allowed", _queueEntry.lockAcquisition(consumer1));
-        assertFalse("Consumer2 should not be allowed", _queueEntry.lockAcquisition(consumer2));
+        assertTrue("Consumer1 relocking should be allowed", _queueEntry.makeAcquisitionUnstealable(consumer1));
+        assertFalse("Consumer2 should not be allowed", _queueEntry.makeAcquisitionUnstealable(consumer2));
 
-        _queueEntry.unlockAcquisition();
+        _queueEntry.makeAcquisitionStealable();
 
         assertTrue("Queue entry should still be acquired by consumer1", _queueEntry.acquiredByConsumer());
 

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Fri Sep  9 14:49:41 2016
@@ -354,13 +354,13 @@ public class StandardQueueTest extends A
         }
 
         @Override
-        public boolean lockAcquisition(final ConsumerImpl consumer)
+        public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
         {
             return true;
         }
 
         @Override
-        public boolean unlockAcquisition()
+        public boolean makeAcquisitionStealable()
         {
             return true;
         }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Fri Sep  9 14:49:41 2016
@@ -38,6 +38,8 @@ import org.apache.qpid.server.flow.FlowC
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
+import org.apache.qpid.server.message.MessageInstance.EntryState;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
@@ -88,17 +90,24 @@ public class ConsumerTarget_0_10 extends
     private long _deferredSizeCredit;
     private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
 
-    private final StateChangeListener<MessageInstance, MessageInstance.State> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, MessageInstance.State>()
+    private final StateChangeListener<MessageInstance, EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, EntryState>()
     {
 
-        public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState)
+        @Override
+        public void stateChanged(MessageInstance entry, EntryState oldState, EntryState newState)
         {
-            if(oldState == MessageInstance.State.ACQUIRED && newState != MessageInstance.State.ACQUIRED)
+            if (isConsumerAcquiredStateForThis(oldState) && !isConsumerAcquiredStateForThis(newState))
             {
                 removeUnacknowledgedMessage(entry);
                 entry.removeStateChangeListener(this);
             }
         }
+
+        private boolean isConsumerAcquiredStateForThis(EntryState state)
+        {
+            return state instanceof ConsumerAcquiredState
+                   && ((ConsumerAcquiredState) state).getConsumer().getTarget() == ConsumerTarget_0_10.this;
+        }
     };
 
     public ConsumerTarget_0_10(ServerSession session,
@@ -389,10 +398,6 @@ public class ConsumerTarget_0_10 extends
     @Override
     public void acquisitionRemoved(final MessageInstance entry)
     {
-        if (entry.removeStateChangeListener(_unacknowledgedMessageListener))
-        {
-            removeUnacknowledgedMessage(entry);
-        }
     }
 
     private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
@@ -439,7 +444,7 @@ public class ConsumerTarget_0_10 extends
     void reject(final ConsumerImpl consumer, final MessageInstance entry)
     {
         entry.setRedelivered();
-        if (entry.lockAcquisition(consumer))
+        if (entry.makeAcquisitionUnstealable(consumer))
         {
             entry.routeToAlternate(null, null);
         }
@@ -474,7 +479,7 @@ public class ConsumerTarget_0_10 extends
         final ServerMessage msg = entry.getMessage();
 
         int requeues = 0;
-        if (entry.lockAcquisition(consumer))
+        if (entry.makeAcquisitionUnstealable(consumer))
         {
             requeues = entry.routeToAlternate(new Action<MessageInstance>()
             {

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Fri Sep  9 14:49:41 2016
@@ -533,7 +533,7 @@ public class ServerSession extends Sessi
                             final ConsumerTarget_0_10 target,
                             final MessageInstance entry)
     {
-        if (entry.lockAcquisition(consumer))
+        if (entry.makeAcquisitionUnstealable(consumer))
         {
             _transaction.dequeue(entry.getEnqueueRecord(),
                                  new ServerTransaction.Action()

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Sep  9 14:49:41 2016
@@ -1637,7 +1637,7 @@ public class AMQChannel
             {
                 for(MessageInstance entry : _ackedMessages)
                 {
-                    entry.unlockAcquisition();
+                    entry.makeAcquisitionStealable();
                 }
                 _resendList.addAll(_ackedMessages);
             }
@@ -1795,7 +1795,7 @@ public class AMQChannel
         {
             final ServerMessage msg = rejectedQueueEntry.getMessage();
             int requeues = 0;
-            if (rejectedQueueEntry.lockAcquisition(rejectedQueueEntry.getAcquiringConsumer()))
+            if (rejectedQueueEntry.makeAcquisitionUnstealable(rejectedQueueEntry.getAcquiringConsumer()))
             {
                 requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
                 {

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Fri Sep  9 14:49:41 2016
@@ -34,6 +34,7 @@ import org.apache.qpid.server.consumer.C
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstance.EntryState;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -545,10 +546,6 @@ public abstract class ConsumerTarget_0_8
     @Override
     public void acquisitionRemoved(final MessageInstance node)
     {
-        if (node.removeStateChangeListener(_unacknowledgedMessageListener))
-        {
-            removeUnacknowledgedMessage(node);
-        }
     }
 
     public long getUnacknowledgedBytes()
@@ -561,17 +558,22 @@ public abstract class ConsumerTarget_0_8
         return _unacknowledgedCount.longValue();
     }
 
-    private final StateChangeListener<MessageInstance, MessageInstance.State> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, MessageInstance.State>()
+    private final StateChangeListener<MessageInstance, EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, EntryState>()
     {
-
-        public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState)
+        @Override
+        public void stateChanged(MessageInstance entry, EntryState oldState, EntryState newState)
         {
-            if(oldState == MessageInstance.State.ACQUIRED && newState != MessageInstance.State.ACQUIRED)
+            if (isConsumerAcquiredStateForThis(oldState) && !isConsumerAcquiredStateForThis(newState))
             {
                 removeUnacknowledgedMessage(entry);
                 entry.removeStateChangeListener(this);
             }
+        }
 
+        private boolean isConsumerAcquiredStateForThis(EntryState state)
+        {
+            return state instanceof MessageInstance.ConsumerAcquiredState
+                   && ((MessageInstance.ConsumerAcquiredState) state).getConsumer().getTarget() == ConsumerTarget_0_8.this;
         }
     };
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Fri Sep  9 14:49:41 2016
@@ -155,7 +155,7 @@ public class UnacknowledgedMessageMapImp
             List<MessageInstance> acknowledged = new ArrayList<>();
             for (MessageInstance instance : ackedMessageMap.values())
             {
-                if (instance.lockAcquisition(instance.getAcquiringConsumer()))
+                if (instance.makeAcquisitionUnstealable(instance.getAcquiringConsumer()))
                 {
                     acknowledged.add(instance);
                 }
@@ -169,7 +169,7 @@ public class UnacknowledgedMessageMapImp
             {
                 instance = remove(deliveryTag);
             }
-            if(instance != null && instance.lockAcquisition(instance.getAcquiringConsumer()))
+            if(instance != null && instance.makeAcquisitionUnstealable(instance.getAcquiringConsumer()))
             {
                 return Collections.singleton(instance);
             }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java Fri Sep  9 14:49:41 2016
@@ -50,8 +50,8 @@ public class UnacknowledgedMessageMapTes
         map = new UnacknowledgedMessageMapImpl(100);
         msgs = populateMap(map,expectedSize);
         // simulate some messages being ttl expired
-        when(msgs[2].lockAcquisition(_consumer)).thenReturn(Boolean.FALSE);
-        when(msgs[4].lockAcquisition(_consumer)).thenReturn(Boolean.FALSE);
+        when(msgs[2].makeAcquisitionUnstealable(_consumer)).thenReturn(Boolean.FALSE);
+        when(msgs[4].makeAcquisitionUnstealable(_consumer)).thenReturn(Boolean.FALSE);
 
         assertEquals(expectedSize,map.size());
 
@@ -80,7 +80,7 @@ public class UnacknowledgedMessageMapTes
     private MessageInstance createMessageInstance(final int id)
     {
         MessageInstance instance = mock(MessageInstance.class);
-        when(instance.lockAcquisition(_consumer)).thenReturn(Boolean.TRUE);
+        when(instance.makeAcquisitionUnstealable(_consumer)).thenReturn(Boolean.TRUE);
         when(instance.getAcquiringConsumer()).thenReturn(_consumer);
         return instance;
     }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Fri Sep  9 14:49:41 2016
@@ -372,7 +372,7 @@ class ConsumerTarget_1_0 extends Abstrac
 
             if(outcome instanceof Accepted)
             {
-                if (_queueEntry.lockAcquisition(getConsumer()))
+                if (_queueEntry.makeAcquisitionUnstealable(getConsumer()))
                 {
                     txn.dequeue(_queueEntry.getEnqueueRecord(),
                                 new ServerTransaction.Action()

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Fri Sep  9 14:49:41 2016
@@ -1065,13 +1065,13 @@ class ManagementNode implements MessageS
         }
 
         @Override
-        public void addStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+        public void addStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
         {
 
         }
 
         @Override
-        public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+        public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
         {
             return false;
         }
@@ -1156,13 +1156,13 @@ class ManagementNode implements MessageS
         }
 
         @Override
-        public boolean lockAcquisition(final ConsumerImpl consumer)
+        public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
         {
             return false;
         }
 
         @Override
-        public boolean unlockAcquisition()
+        public boolean makeAcquisitionStealable()
         {
             return false;
         }

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java Fri Sep  9 14:49:41 2016
@@ -66,13 +66,13 @@ class ManagementResponse implements Mess
     }
 
     @Override
-    public void addStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+    public void addStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
     {
 
     }
 
     @Override
-    public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+    public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
     {
         return false;
     }
@@ -157,13 +157,13 @@ class ManagementResponse implements Mess
     }
 
     @Override
-    public boolean lockAcquisition(final ConsumerImpl consumer)
+    public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
     {
         return false;
     }
 
     @Override
-    public boolean unlockAcquisition()
+    public boolean makeAcquisitionStealable()
     {
         return false;
     }

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java?rev=1760032&r1=1760031&r2=1760032&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java Fri Sep  9 14:49:41 2016
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.client.AMQSession_0_8;
 import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.test.utils.BrokerHolder;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 public class FlowControlTest extends QpidBrokerTestCase
@@ -70,18 +69,9 @@ public class FlowControlTest extends Qpi
         Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer producer = producerSession.createProducer(_queue);
 
-        BytesMessage m1 = producerSession.createBytesMessage();
-        m1.writeBytes(new byte[128]);
-        m1.setIntProperty("msg", 1);
-        producer.send(m1);
-        BytesMessage m2 = producerSession.createBytesMessage();
-        m2.writeBytes(new byte[128]);
-        m2.setIntProperty("msg", 2);
-        producer.send(m2);
-        BytesMessage m3 = producerSession.createBytesMessage();
-        m3.writeBytes(new byte[256]);
-        m3.setIntProperty("msg", 3);
-        producer.send(m3);
+        sendBytesMessage(producerSession, producer, 1, 128);
+        sendBytesMessage(producerSession, producer, 2, 128);
+        sendBytesMessage(producerSession, producer, 3, 256);
 
         producer.close();
         producerSession.close();
@@ -140,18 +130,9 @@ public class FlowControlTest extends Qpi
         Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer producer = producerSession.createProducer(_queue);
 
-        BytesMessage m1 = producerSession.createBytesMessage();
-        m1.writeBytes(new byte[128]);
-        m1.setIntProperty("msg", 1);
-        producer.send(m1);
-        BytesMessage m2 = producerSession.createBytesMessage();
-        m2.writeBytes(new byte[256]);
-        m2.setIntProperty("msg", 2);
-        producer.send(m2);
-        BytesMessage m3 = producerSession.createBytesMessage();
-        m3.writeBytes(new byte[128]);
-        m3.setIntProperty("msg", 3);
-        producer.send(m3);
+        sendBytesMessage(producerSession, producer, 1, 128);
+        sendBytesMessage(producerSession, producer, 2, 256);
+        sendBytesMessage(producerSession, producer, 3, 128);
 
         producer.close();
         producerSession.close();
@@ -196,31 +177,47 @@ public class FlowControlTest extends Qpi
 
     }
 
-    public static void main(String args[]) throws Throwable
+    public void testDeliverMessageLargerThanBytesLimit() throws Exception
     {
-        FlowControlTest test = new FlowControlTest();
+        _queue = (Queue) getInitialContext().lookup("queue");
+        Connection connection = getConnection();
+        connection.start();
+
+        Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producerSession.createConsumer(_queue).close();
+        MessageProducer producer = producerSession.createProducer(_queue);
+
+        sendBytesMessage(producerSession, producer, 1, 128);
+        sendBytesMessage(producerSession, producer, 2, 256);
+
+        Session consumerSession1 = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        ((AMQSession_0_8) consumerSession1).setPrefetchLimits(0, 64);
+        MessageConsumer recv1 = consumerSession1.createConsumer(_queue);
+
+        Message r1 = recv1.receive(RECEIVE_TIMEOUT);
+        assertNotNull("First message not received", r1);
+        assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg"));
 
-        int run = 0;
-        while (true)
-        {
-            System.err.println("Test Run:" + ++run);
-            Thread.sleep(1000);
-            BrokerHolder broker = null;
-            try
-            {
-                broker = test.createSpawnedBroker();
-                test.testBasicBytesFlowControl();
-
-                Thread.sleep(1000);
-            }
-            finally
-            {
-                if (broker != null)
-                {
-                    broker.shutdown();
-                }
-            }
-        }
+        Message r2 = recv1.receive(RECEIVE_TIMEOUT);
+        assertNull("Second message incorrectly delivered", r2);
+
+        r1.acknowledge();
+
+        r2 = recv1.receive(RECEIVE_TIMEOUT);
+        assertNotNull("Second message not received", r2);
+        assertEquals("Wrong messages received", 2, r2.getIntProperty("msg"));
+
+        r2.acknowledge();
+    }
+
+    private void sendBytesMessage(final Session producerSession,
+                                  final MessageProducer producer,
+                                  final int messageId, final int messageSize) throws Exception
+    {
+        BytesMessage message = producerSession.createBytesMessage();
+        message.writeBytes(new byte[messageSize]);
+        message.setIntProperty("msg", messageId);
+        producer.send(message);
     }
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org