You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/09/12 13:58:15 UTC

svn commit: r1760370 - in /qpid/java/branches/6.0.x: ./ broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/main/ja...

Author: lquack
Date: Mon Sep 12 13:58:15 2016
New Revision: 1760370

URL: http://svn.apache.org/viewvc?rev=1760370&view=rev
Log:
QPID-7417: [Java Broker] Ensure message instance listeners only fire on state change of the associated object

Merged from trunk with command:

svn merge -c 1760032,1760337 ^/qpid/java/trunk

Modified:
    qpid/java/branches/6.0.x/   (props changed)
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
    qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
    qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
    qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
    qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
    qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
    qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
    qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
    qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
    qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
    qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
    qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java
    qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java

Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 12 13:58:15 2016
@@ -9,5 +9,5 @@
 /qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk
 657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732465,1732525,1732812,1733467,1734452,1736478,1736751,1736838,1737804,1737835,1737853,1737984,1737992,1738119,1738135,1738231,1738271,1738607,1738610,1738731,1738914,1741702,1742257,1742284,1742339,1742544,1742900,1742926,1743161,1743228,1743383,1743982,1744012-1744013,1744046,1744123,1744157,1744276,1744403,1745424,1745450,1746140,1746273,1747526,1748254,1748723,1748818,1749349,1749399,1749482,1749524,1750359-1750360,1750943,1751433,1754251,1754354,1754392,1754429,1754510,1754550,1755561,1755957,1758628,1758640,1758766,1758964,1759774,1759783
+/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
 657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732465,1732525,1732812,1733467,1734452,1736478,1736751,1736838,1737804,1737835,1737853,1737984,1737992,1738119,1738135,1738231,1738271,1738607,1738610,1738731,1738914,1741702,1742257,1742284,1742339,1742544,1742900,1742926,1743161,1743228,1743383,1743982,1744012-1744013,1744046,1744123,1744157,1744276,1744403,1745424,1745450,1746140,1746273,1747526,1748254,1748723,1748818,1749349,1749399,1749482,1749524,1750359-1750360,1750943,1751433,1754251,1754354,1754392,1754429,1754510,1754550,1755561,1755957,1758628,1758640,1758766,1758964,1759774,1759783,1760032,1760337
 /qpid/trunk/qpid:796646-796653

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Mon Sep 12 13:58:15 2016
@@ -220,7 +220,7 @@ public abstract class AbstractConsumerTa
 
                 if (consumer.acquires())
                 {
-                    entry.unlockAcquisition();
+                    entry.makeAcquisitionStealable();
                 }
             }
             finally

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Mon Sep 12 13:58:15 2016
@@ -44,9 +44,9 @@ public interface MessageInstance
 
     void decrementDeliveryCount();
 
-    void addStateChangeListener(StateChangeListener<? super MessageInstance,State> listener);
+    void addStateChangeListener(StateChangeListener<? super MessageInstance, EntryState> listener);
 
-    boolean removeStateChangeListener(StateChangeListener<? super MessageInstance, State> listener);
+    boolean removeStateChangeListener(StateChangeListener<? super MessageInstance, EntryState> listener);
 
     boolean acquiredByConsumer();
 
@@ -70,9 +70,9 @@ public interface MessageInstance
 
     boolean acquire(ConsumerImpl sub);
 
-    boolean lockAcquisition(final ConsumerImpl consumer);
+    boolean makeAcquisitionUnstealable(final ConsumerImpl consumer);
 
-    boolean unlockAcquisition();
+    boolean makeAcquisitionStealable();
 
     int getMaximumDeliveryCount();
 
@@ -84,7 +84,7 @@ public interface MessageInstance
 
     MessageEnqueueRecord getEnqueueRecord();
 
-    public static enum State
+    enum State
     {
         AVAILABLE,
         ACQUIRED,
@@ -92,7 +92,7 @@ public interface MessageInstance
         DELETED
     }
 
-    public abstract class EntryState
+    abstract class EntryState
     {
         protected EntryState()
         {
@@ -114,7 +114,7 @@ public interface MessageInstance
     }
 
 
-    public final class AvailableState extends EntryState
+    final class AvailableState extends EntryState
     {
 
         public State getState()
@@ -129,7 +129,7 @@ public interface MessageInstance
     }
 
 
-    public final class DequeuedState extends EntryState
+    final class DequeuedState extends EntryState
     {
 
         public State getState()
@@ -144,7 +144,7 @@ public interface MessageInstance
     }
 
 
-    public final class DeletedState extends EntryState
+    final class DeletedState extends EntryState
     {
 
         public State getState()
@@ -158,7 +158,7 @@ public interface MessageInstance
         }
     }
 
-    public final class NonConsumerAcquiredState extends EntryState
+    final class NonConsumerAcquiredState extends EntryState
     {
         public State getState()
         {
@@ -171,76 +171,72 @@ public interface MessageInstance
         }
     }
 
-    public final class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState
+    abstract class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState
     {
-        private final C _consumer;
-        private final LockedAcquiredState<C> _lockedState;
+        public abstract C getConsumer();
 
-        public ConsumerAcquiredState(C consumer)
+        @Override
+        public final State getState()
         {
-            _consumer = consumer;
-            _lockedState = new LockedAcquiredState<>(this);
+            return State.ACQUIRED;
         }
 
-
-        public State getState()
+        @Override
+        public String toString()
         {
-            return State.ACQUIRED;
+            return "{" + getState().name() + " : " + getConsumer() +"}";
         }
+    }
 
-        public C getConsumer()
+    final class StealableConsumerAcquiredState<C extends ConsumerImpl> extends ConsumerAcquiredState
+    {
+        private final C _consumer;
+        private final UnstealableConsumerAcquiredState<C> _unstealableState;
+
+        public StealableConsumerAcquiredState(C consumer)
         {
-            return _consumer;
+            _consumer = consumer;
+            _unstealableState = new UnstealableConsumerAcquiredState<>(this);
         }
 
-        public String toString()
+        @Override
+        public C getConsumer()
         {
-            return "{" + getState().name() + " : " + _consumer +"}";
+            return _consumer;
         }
 
-        public LockedAcquiredState<C> getLockedState()
+        public UnstealableConsumerAcquiredState<C> getUnstealableState()
         {
-            return _lockedState;
+            return _unstealableState;
         }
-
     }
 
-    public final class LockedAcquiredState<C extends ConsumerImpl> extends EntryState
+    final class UnstealableConsumerAcquiredState<C extends ConsumerImpl> extends ConsumerAcquiredState
     {
-        private final ConsumerAcquiredState<C> _acquiredState;
+        private final StealableConsumerAcquiredState<C> _stealableState;
 
-        public LockedAcquiredState(final ConsumerAcquiredState<C> acquiredState)
+        public UnstealableConsumerAcquiredState(final StealableConsumerAcquiredState<C> stealableState)
         {
-            _acquiredState = acquiredState;
+            _stealableState = stealableState;
         }
 
         @Override
-        public State getState()
-        {
-            return State.ACQUIRED;
-        }
-
         public C getConsumer()
         {
-            return _acquiredState.getConsumer();
-        }
-
-        public String toString()
-        {
-            return "{" + getState().name() + " : " + _acquiredState.getConsumer() +"}";
+            return _stealableState.getConsumer();
         }
 
-        public ConsumerAcquiredState<C> getUnlockedState()
+        public StealableConsumerAcquiredState<C> getStealableState()
         {
-            return _acquiredState;
+            return _stealableState;
         }
     }
 
 
-    final static EntryState AVAILABLE_STATE = new AvailableState();
-    final static EntryState DELETED_STATE = new DeletedState();
-    final static EntryState DEQUEUED_STATE = new DequeuedState();
-    final static EntryState NON_CONSUMER_ACQUIRED_STATE = new NonConsumerAcquiredState();
+    EntryState AVAILABLE_STATE = new AvailableState();
+    EntryState DELETED_STATE = new DeletedState();
+    EntryState DEQUEUED_STATE = new DequeuedState();
+    EntryState NON_CONSUMER_ACQUIRED_STATE = new NonConsumerAcquiredState();
 
     boolean isAvailable();
 

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java Mon Sep 12 13:58:15 2016
@@ -22,6 +22,8 @@ package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
+import org.apache.qpid.server.message.MessageInstance.EntryState;
 import org.apache.qpid.server.util.StateChangeListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -172,7 +174,7 @@ public class DefinedGroupMessageGroupMan
 
             _groupMap.put(groupId, group);
 
-            // there's a small change that the group became empty between the point at which getNextAvailable() was
+            // there's a small chance that the group became empty between the point at which getNextAvailable() was
             // called on the consumer, and when accept message is called... in that case we want to avoid delivering
             // out of order
             if(_resetHelper.isEntryAheadOfConsumer(entry, sub))
@@ -256,7 +258,7 @@ public class DefinedGroupMessageGroupMan
         return groupVal;
     }
 
-    private class GroupStateChangeListener implements StateChangeListener<MessageInstance, MessageInstance.State>
+    private class GroupStateChangeListener implements StateChangeListener<MessageInstance, EntryState>
     {
         private final Group _group;
 
@@ -265,24 +267,20 @@ public class DefinedGroupMessageGroupMan
             _group = group;
         }
 
-        public void stateChanged(final MessageInstance entry,
-                                 final MessageInstance.State oldState,
-                                 final MessageInstance.State newState)
+        @Override
+        public void stateChanged(final MessageInstance entry, final EntryState oldState, final EntryState newState)
         {
             synchronized (DefinedGroupMessageGroupManager.this)
             {
                 if(_group.isValid())
                 {
-                    if(oldState != newState)
+                    if (isConsumerAcquiredStateForThisGroup(newState) && !isConsumerAcquiredStateForThisGroup(oldState))
+                    {
+                        _group.add();
+                    }
+                    else if (isConsumerAcquiredStateForThisGroup(oldState) && !isConsumerAcquiredStateForThisGroup(newState))
                     {
-                        if(newState == QueueEntry.State.ACQUIRED)
-                        {
-                            _group.add();
-                        }
-                        else if(oldState == QueueEntry.State.ACQUIRED)
-                        {
-                            _group.subtract((QueueEntry) entry, newState == MessageInstance.State.AVAILABLE);
-                        }
+                        _group.subtract((QueueEntry) entry, newState.getState() == MessageInstance.State.AVAILABLE);
                     }
                 }
                 else
@@ -291,5 +289,11 @@ public class DefinedGroupMessageGroupMan
                 }
             }
         }
+
+        private boolean isConsumerAcquiredStateForThisGroup(EntryState state)
+        {
+            return state instanceof ConsumerAcquiredState
+                   && ((ConsumerAcquiredState) state).getConsumer() == _group.getConsumer();
+        }
     }
 }

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Mon Sep 12 13:58:15 2016
@@ -47,7 +47,7 @@ public interface QueueConsumer<X extends
 
     boolean resend(QueueEntry e);
 
-    MessageInstance.ConsumerAcquiredState<X> getOwningState();
+    MessageInstance.StealableConsumerAcquiredState<X> getOwningState();
 
     QueueContext getQueueContext();
 

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Mon Sep 12 13:58:15 2016
@@ -70,7 +70,8 @@ class QueueConsumerImpl
     private final AtomicBoolean _closed = new AtomicBoolean(false);
     private final long _consumerNumber;
     private final long _createTime = System.currentTimeMillis();
-    private final MessageInstance.ConsumerAcquiredState<QueueConsumerImpl> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumerImpl>(this);
+    private final MessageInstance.StealableConsumerAcquiredState<QueueConsumerImpl>
+            _owningState = new MessageInstance.StealableConsumerAcquiredState<>(this);
     private final WaitingOnCreditMessageListener _waitingOnCreditMessageListener = new WaitingOnCreditMessageListener();
     private final boolean _acquires;
     private final boolean _seesRequeues;
@@ -520,7 +521,7 @@ class QueueConsumerImpl
         return _createTime;
     }
 
-    public final MessageInstance.ConsumerAcquiredState<QueueConsumerImpl> getOwningState()
+    public final MessageInstance.StealableConsumerAcquiredState<QueueConsumerImpl> getOwningState()
     {
         return _owningState;
     }
@@ -626,7 +627,7 @@ class QueueConsumerImpl
         return _queue.getEventLogger();
     }
 
-    public class WaitingOnCreditMessageListener implements StateChangeListener<MessageInstance, QueueEntry.State>
+    public class WaitingOnCreditMessageListener implements StateChangeListener<MessageInstance, MessageInstance.EntryState>
     {
         private final AtomicReference<MessageInstance> _entry = new AtomicReference<>();
 
@@ -657,7 +658,8 @@ class QueueConsumerImpl
 
         }
 
-        public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState)
+        @Override
+        public void stateChanged(MessageInstance entry, MessageInstance.EntryState oldState, MessageInstance.EntryState newState)
         {
             entry.removeStateChangeListener(this);
             _entry.compareAndSet(entry, null);

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Sep 12 13:58:15 2016
@@ -80,7 +80,7 @@ public abstract class QueueEntryImpl imp
         (QueueEntryImpl.class, EntryState.class, "_state");
 
 
-    private volatile StateChangeListenerEntry<? super QueueEntry, State> _stateChangeListeners;
+    private volatile StateChangeListenerEntry<? super QueueEntry, EntryState> _stateChangeListeners;
 
     private static final
         AtomicReferenceFieldUpdater<QueueEntryImpl, StateChangeListenerEntry>
@@ -223,7 +223,7 @@ public abstract class QueueEntryImpl imp
     {
         return acquire(NON_CONSUMER_ACQUIRED_STATE);
     }
-    private class DelayedAcquisitionStateListener implements StateChangeListener<MessageInstance, State>
+    private class DelayedAcquisitionStateListener implements StateChangeListener<MessageInstance, EntryState>
     {
         private final Runnable _task;
         private final AtomicBoolean _run = new AtomicBoolean();
@@ -234,13 +234,13 @@ public abstract class QueueEntryImpl imp
         }
 
         @Override
-        public void stateChanged(final MessageInstance object, final State oldState, final State newState)
+        public void stateChanged(final MessageInstance object, final EntryState oldState, final EntryState newState)
         {
-            if(newState == State.DELETED || newState == State.DEQUEUED)
+            if (newState.equals(DELETED_STATE) || newState.equals(DEQUEUED_STATE))
             {
                 QueueEntryImpl.this.removeStateChangeListener(this);
             }
-            else if(acquireOrSteal(null))
+            else if (acquireOrSteal(null))
             {
                 runTask();
             }
@@ -287,7 +287,7 @@ public abstract class QueueEntryImpl imp
 
         EntryState currentState;
 
-        while((currentState = _state).getState() == State.AVAILABLE)
+        while((currentState = _state).equals(AVAILABLE_STATE))
         {
             if(acquired = _stateUpdater.compareAndSet(this, currentState, state))
             {
@@ -297,7 +297,7 @@ public abstract class QueueEntryImpl imp
 
         if(acquired && _stateChangeListeners != null)
         {
-            notifyStateChange(State.AVAILABLE, State.ACQUIRED);
+            notifyStateChange(AVAILABLE_STATE, state);
         }
 
         return acquired;
@@ -305,7 +305,7 @@ public abstract class QueueEntryImpl imp
 
     public boolean acquire(ConsumerImpl sub)
     {
-        final boolean acquired = acquire(((QueueConsumer<?>) sub).getOwningState().getLockedState());
+        final boolean acquired = acquire(((QueueConsumer<?>) sub).getOwningState().getUnstealableState());
         if(acquired)
         {
             _deliveryCountUpdater.compareAndSet(this,-1,0);
@@ -315,33 +315,35 @@ public abstract class QueueEntryImpl imp
     }
 
     @Override
-    public boolean lockAcquisition(final ConsumerImpl consumer)
+    public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
     {
         EntryState state = _state;
-        if(state instanceof ConsumerAcquiredState && ((ConsumerAcquiredState) state).getConsumer() == consumer)
+        if(state instanceof StealableConsumerAcquiredState
+           && ((StealableConsumerAcquiredState) state).getConsumer() == consumer)
         {
-            LockedAcquiredState lockedState = ((ConsumerAcquiredState) state).getLockedState();
-            boolean updated = _stateUpdater.compareAndSet(this, state, lockedState);
+            UnstealableConsumerAcquiredState unstealableState = ((StealableConsumerAcquiredState) state).getUnstealableState();
+            boolean updated = _stateUpdater.compareAndSet(this, state, unstealableState);
             if(updated)
             {
-                notifyStateChange(state.getState(), lockedState.getState());
+                notifyStateChange(state, unstealableState);
             }
             return updated;
         }
-        return state instanceof LockedAcquiredState && ((LockedAcquiredState) state).getConsumer() == consumer;
+        return state instanceof UnstealableConsumerAcquiredState
+               && ((UnstealableConsumerAcquiredState) state).getConsumer() == consumer;
     }
 
     @Override
-    public boolean unlockAcquisition()
+    public boolean makeAcquisitionStealable()
     {
         EntryState state = _state;
-        if(state instanceof LockedAcquiredState)
+        if(state instanceof UnstealableConsumerAcquiredState)
         {
-            ConsumerAcquiredState unlockedState = ((LockedAcquiredState) state).getUnlockedState();
-            boolean updated = _stateUpdater.compareAndSet(this, state, unlockedState);
+            StealableConsumerAcquiredState stealableState = ((UnstealableConsumerAcquiredState) state).getStealableState();
+            boolean updated = _stateUpdater.compareAndSet(this, state, stealableState);
             if(updated)
             {
-                notifyStateChange(state.getState(),unlockedState.getState());
+                notifyStateChange(state, stealableState);
             }
             return updated;
         }
@@ -350,8 +352,7 @@ public abstract class QueueEntryImpl imp
 
     public boolean acquiredByConsumer()
     {
-
-        return (_state instanceof ConsumerAcquiredState) || (_state instanceof LockedAcquiredState);
+        return _state instanceof ConsumerAcquiredState;
     }
 
     @Override
@@ -359,14 +360,10 @@ public abstract class QueueEntryImpl imp
     {
         ConsumerImpl consumer;
         EntryState state = _state;
-        if(state instanceof ConsumerAcquiredState)
+        if (state instanceof ConsumerAcquiredState)
         {
             consumer = ((ConsumerAcquiredState)state).getConsumer();
         }
-        else if(state instanceof LockedAcquiredState)
-        {
-            consumer = ((LockedAcquiredState)state).getConsumer();
-        }
         else
         {
             consumer = null;
@@ -378,20 +375,22 @@ public abstract class QueueEntryImpl imp
     public boolean isAcquiredBy(ConsumerImpl consumer)
     {
         EntryState state = _state;
-        return (state instanceof ConsumerAcquiredState
-               && ((ConsumerAcquiredState)state).getConsumer() == consumer)
-                || (state instanceof LockedAcquiredState
-                    && ((LockedAcquiredState)state).getConsumer() == consumer);
+        return (state instanceof ConsumerAcquiredState && ((ConsumerAcquiredState)state).getConsumer() == consumer);
     }
 
     @Override
     public boolean removeAcquisitionFromConsumer(ConsumerImpl consumer)
     {
         EntryState state = _state;
-        if(state instanceof ConsumerAcquiredState
-               && ((ConsumerAcquiredState)state).getConsumer() == consumer)
+        if(state instanceof StealableConsumerAcquiredState
+               && ((StealableConsumerAcquiredState)state).getConsumer() == consumer)
         {
-            return _stateUpdater.compareAndSet(this,state,NON_CONSUMER_ACQUIRED_STATE);
+            final boolean stateWasChanged = _stateUpdater.compareAndSet(this, state, NON_CONSUMER_ACQUIRED_STATE);
+            if (stateWasChanged)
+            {
+                notifyStateChange(state, NON_CONSUMER_ACQUIRED_STATE);
+            }
+            return stateWasChanged;
         }
         else
         {
@@ -422,7 +421,7 @@ public abstract class QueueEntryImpl imp
 
     private void postRelease(final EntryState previousState)
     {
-        if(previousState instanceof ConsumerAcquiredState || previousState instanceof LockedAcquiredState)
+        if (previousState instanceof ConsumerAcquiredState)
         {
             getQueue().decrementUnackedMsgCount(this);
         }
@@ -430,9 +429,9 @@ public abstract class QueueEntryImpl imp
         if(!getQueue().isDeleted())
         {
             getQueue().requeue(this);
-            if(_stateChangeListeners != null && previousState.getState() == State.ACQUIRED)
+            if (_stateChangeListeners != null && previousState.getState() == State.ACQUIRED)
             {
-                notifyStateChange(State.ACQUIRED, State.AVAILABLE);
+                notifyStateChange(previousState, AVAILABLE_STATE);
             }
 
         }
@@ -477,19 +476,7 @@ public abstract class QueueEntryImpl imp
     @Override
     public QueueConsumer getDeliveredConsumer()
     {
-        EntryState state = _state;
-        if (state instanceof ConsumerAcquiredState)
-        {
-            return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer();
-        }
-        else if (state instanceof LockedAcquiredState)
-        {
-            return (QueueConsumer) ((LockedAcquiredState) state).getConsumer();
-        }
-        else
-        {
-            return null;
-        }
+        return (QueueConsumer) getAcquiringConsumer();
     }
 
     public void reject()
@@ -535,7 +522,7 @@ public abstract class QueueEntryImpl imp
 
         if(state.getState() == State.ACQUIRED)
         {
-            if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
+            if (state instanceof ConsumerAcquiredState)
             {
                 getQueue().decrementUnackedMsgCount(this);
             }
@@ -543,7 +530,7 @@ public abstract class QueueEntryImpl imp
             getQueue().dequeue(this);
             if(_stateChangeListeners != null)
             {
-                notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
+                notifyStateChange(state, DEQUEUED_STATE);
             }
             return true;
         }
@@ -554,12 +541,12 @@ public abstract class QueueEntryImpl imp
 
     }
 
-    private void notifyStateChange(final State oldState, final State newState)
+    private void notifyStateChange(final EntryState oldState, final EntryState newState)
     {
-        StateChangeListenerEntry<? super QueueEntry, State> entry = _listenersUpdater.get(this);
+        StateChangeListenerEntry<? super QueueEntry, EntryState> entry = _listenersUpdater.get(this);
         while(entry != null)
         {
-            StateChangeListener<? super QueueEntry, State> l = entry.getListener();
+            StateChangeListener<? super QueueEntry, EntryState> l = entry.getListener();
             if(l != null)
             {
                 l.stateChanged(this, oldState, newState);
@@ -649,16 +636,16 @@ public abstract class QueueEntryImpl imp
         return getQueue().isDeleted();
     }
 
-    public void addStateChangeListener(StateChangeListener<? super MessageInstance,State> listener)
+    public void addStateChangeListener(StateChangeListener<? super MessageInstance, EntryState> listener)
     {
-        StateChangeListenerEntry<? super QueueEntry, State> entry = new StateChangeListenerEntry<>(listener);
+        StateChangeListenerEntry<? super QueueEntry, EntryState> entry = new StateChangeListenerEntry<>(listener);
         if(!_listenersUpdater.compareAndSet(this,null, entry))
         {
             _listenersUpdater.get(this).add(entry);
         }
     }
 
-    public boolean removeStateChangeListener(StateChangeListener<? super MessageInstance, State> listener)
+    public boolean removeStateChangeListener(StateChangeListener<? super MessageInstance, EntryState> listener)
     {
         StateChangeListenerEntry entry = _listenersUpdater.get(this);
         if(entry != null)

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java Mon Sep 12 13:58:15 2016
@@ -20,7 +20,7 @@
  */
 package org.apache.qpid.server.util;
 
-public interface StateChangeListener<T, E extends Enum>
+public interface StateChangeListener<T, E>
 {
     void stateChanged(T object, E oldState, E newState);
 }

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java Mon Sep 12 13:58:15 2016
@@ -22,7 +22,7 @@ package org.apache.qpid.server.util;
 
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-public class StateChangeListenerEntry<T, E extends Enum>
+public class StateChangeListenerEntry<T, E>
 {
     private static final AtomicReferenceFieldUpdater<StateChangeListenerEntry, StateChangeListenerEntry> NEXT =
             AtomicReferenceFieldUpdater.newUpdater(StateChangeListenerEntry.class, StateChangeListenerEntry.class, "_next");

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Mon Sep 12 13:58:15 2016
@@ -357,13 +357,13 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public void addStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+        public void addStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
         {
 
         }
 
         @Override
-        public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+        public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
         {
             return false;
         }
@@ -448,13 +448,13 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public boolean lockAcquisition(final ConsumerImpl consumer)
+        public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
         {
             return false;
         }
 
         @Override
-        public boolean unlockAcquisition()
+        public boolean makeAcquisitionStealable()
         {
             return false;
         }

Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Mon Sep 12 13:58:15 2016
@@ -413,12 +413,12 @@ abstract class AbstractQueueTestBase ext
         QueueEntry queueEntry = queueEntries.get(0);
 
         final CountDownLatch dequeueIndicator = new CountDownLatch(1);
-        queueEntry.addStateChangeListener(new StateChangeListener<MessageInstance, MessageInstance.State>()
+        queueEntry.addStateChangeListener(new StateChangeListener<MessageInstance, MessageInstance.EntryState>()
         {
             @Override
-            public void stateChanged(MessageInstance object, MessageInstance.State oldState, MessageInstance.State newState)
+            public void stateChanged(MessageInstance object, MessageInstance.EntryState oldState, MessageInstance.EntryState newState)
             {
-                if (newState == MessageInstance.State.DEQUEUED)
+                if (newState.equals(MessageInstance.DEQUEUED_STATE))
                 {
                     dequeueIndicator.countDown();
                 }

Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java Mon Sep 12 13:58:15 2016
@@ -55,7 +55,8 @@ public class ConsumerListTest extends Qp
     private QueueConsumer newMockConsumer()
     {
         QueueConsumer consumer = mock(QueueConsumer.class);
-        MessageInstance.ConsumerAcquiredState owningState = new QueueEntryImpl.ConsumerAcquiredState(consumer);
+        MessageInstance.StealableConsumerAcquiredState
+                owningState = new MessageInstance.StealableConsumerAcquiredState(consumer);
         when(consumer.getOwningState()).thenReturn(owningState);
 
         return consumer;

Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Mon Sep 12 13:58:15 2016
@@ -101,13 +101,13 @@ public class MockMessageInstance impleme
     }
 
     @Override
-    public boolean lockAcquisition(final ConsumerImpl consumer)
+    public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
     {
         return false;
     }
 
     @Override
-    public boolean unlockAcquisition()
+    public boolean makeAcquisitionStealable()
     {
         return false;
     }
@@ -214,13 +214,13 @@ public class MockMessageInstance impleme
     }
 
     @Override
-    public void addStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+    public void addStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
     {
 
     }
 
     @Override
-    public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+    public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
     {
         return false;
     }

Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Mon Sep 12 13:58:15 2016
@@ -18,8 +18,12 @@
  */
 package org.apache.qpid.server.queue;
 
+import static org.apache.qpid.server.message.MessageInstance.NON_CONSUMER_ACQUIRED_STATE;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.lang.reflect.Field;
@@ -33,6 +37,8 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstance.EntryState;
+import org.apache.qpid.server.message.MessageInstance.StealableConsumerAcquiredState;
+import org.apache.qpid.server.message.MessageInstance.UnstealableConsumerAcquiredState;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.BrokerModel;
@@ -41,6 +47,7 @@ import org.apache.qpid.server.model.Conf
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 /**
@@ -135,12 +142,31 @@ public abstract class QueueEntryImplTest
     {
         final QueueConsumer consumer = mock(QueueConsumer.class);
 
-        MessageInstance.ConsumerAcquiredState owningState = new QueueEntryImpl.ConsumerAcquiredState(consumer);
+        StealableConsumerAcquiredState
+                owningState = new StealableConsumerAcquiredState(consumer);
         when(consumer.getOwningState()).thenReturn(owningState);
         when(consumer.getConsumerNumber()).thenReturn(_consumerId++);
         return consumer;
     }
 
+    public void testStateChanges()
+    {
+        QueueConsumer consumer = newConsumer();
+        StateChangeListener<MessageInstance, EntryState> stateChangeListener = mock(StateChangeListener.class);
+        _queueEntry.addStateChangeListener(stateChangeListener);
+        _queueEntry.acquire(consumer);
+        verify(stateChangeListener).stateChanged(eq(_queueEntry),
+                                                 eq(MessageInstance.AVAILABLE_STATE),
+                                                 isA(UnstealableConsumerAcquiredState.class));
+        _queueEntry.makeAcquisitionStealable();
+        verify(stateChangeListener).stateChanged(eq(_queueEntry),
+                                                 isA(UnstealableConsumerAcquiredState.class),
+                                                 isA(StealableConsumerAcquiredState.class));
+        _queueEntry.removeAcquisitionFromConsumer(consumer);
+        verify(stateChangeListener).stateChanged(eq(_queueEntry),
+                                                 isA(StealableConsumerAcquiredState.class),
+                                                 eq(NON_CONSUMER_ACQUIRED_STATE));
+    }
 
     public void testLocking()
     {
@@ -152,7 +178,7 @@ public abstract class QueueEntryImplTest
                    _queueEntry.isAcquired());
 
         assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
-        assertTrue("Should be able to unlock locked queue entry", _queueEntry.unlockAcquisition());
+        assertTrue("Should be able to unlock locked queue entry", _queueEntry.makeAcquisitionStealable());
         assertFalse("Acquisition should not be able to be removed from the wrong consumer",
                     _queueEntry.removeAcquisitionFromConsumer(consumer2));
         assertTrue("Acquisition should be able to be removed once unlocked",
@@ -169,8 +195,8 @@ public abstract class QueueEntryImplTest
                    _queueEntry.isAcquired());
 
         assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
-        assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition());
-        assertTrue("Should be able to lock queue entry",_queueEntry.lockAcquisition(consumer));
+        assertTrue("Should be able to unlock locked queue entry",_queueEntry.makeAcquisitionStealable());
+        assertTrue("Should be able to lock queue entry",_queueEntry.makeAcquisitionUnstealable(consumer));
         assertFalse("Acquisition should not be able to be hijacked when locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
 
         _queueEntry.delete();
@@ -185,10 +211,10 @@ public abstract class QueueEntryImplTest
         _queueEntry.acquire(consumer1);
         assertTrue("Queue entry should be acquired by consumer1", _queueEntry.acquiredByConsumer());
 
-        assertTrue("Consumer1 relocking should be allowed", _queueEntry.lockAcquisition(consumer1));
-        assertFalse("Consumer2 should not be allowed", _queueEntry.lockAcquisition(consumer2));
+        assertTrue("Consumer1 relocking should be allowed", _queueEntry.makeAcquisitionUnstealable(consumer1));
+        assertFalse("Consumer2 should not be allowed", _queueEntry.makeAcquisitionUnstealable(consumer2));
 
-        _queueEntry.unlockAcquisition();
+        _queueEntry.makeAcquisitionStealable();
 
         assertTrue("Queue entry should still be acquired by consumer1", _queueEntry.acquiredByConsumer());
 

Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Mon Sep 12 13:58:15 2016
@@ -341,13 +341,13 @@ public class StandardQueueTest extends A
         }
 
         @Override
-        public boolean lockAcquisition(final ConsumerImpl consumer)
+        public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
         {
             return true;
         }
 
         @Override
-        public boolean unlockAcquisition()
+        public boolean makeAcquisitionStealable()
         {
             return true;
         }

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Mon Sep 12 13:58:15 2016
@@ -39,6 +39,8 @@ import org.apache.qpid.server.flow.FlowC
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
+import org.apache.qpid.server.message.MessageInstance.EntryState;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.plugin.MessageConverter;
@@ -89,17 +91,24 @@ public class ConsumerTarget_0_10 extends
     private long _deferredSizeCredit;
     private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
 
-    private final StateChangeListener<MessageInstance, MessageInstance.State> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, MessageInstance.State>()
+    private final StateChangeListener<MessageInstance, EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, EntryState>()
     {
 
-        public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState)
+        @Override
+        public void stateChanged(MessageInstance entry, EntryState oldState, EntryState newState)
         {
-            if(oldState == MessageInstance.State.ACQUIRED && newState != MessageInstance.State.ACQUIRED)
+            if (isConsumerAcquiredStateForThis(oldState) && !isConsumerAcquiredStateForThis(newState))
             {
                 removeUnacknowledgedMessage(entry);
                 entry.removeStateChangeListener(this);
             }
         }
+
+        private boolean isConsumerAcquiredStateForThis(EntryState state)
+        {
+            return state instanceof ConsumerAcquiredState
+                   && ((ConsumerAcquiredState) state).getConsumer().getTarget() == ConsumerTarget_0_10.this;
+        }
     };
 
     public ConsumerTarget_0_10(ServerSession session,
@@ -383,10 +392,6 @@ public class ConsumerTarget_0_10 extends
     @Override
     public void acquisitionRemoved(final MessageInstance entry)
     {
-        if (entry.removeStateChangeListener(_unacknowledgedMessageListener))
-        {
-            removeUnacknowledgedMessage(entry);
-        }
     }
 
     private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
@@ -433,7 +438,7 @@ public class ConsumerTarget_0_10 extends
     void reject(final ConsumerImpl consumer, final MessageInstance entry)
     {
         entry.setRedelivered();
-        if (entry.lockAcquisition(consumer))
+        if (entry.makeAcquisitionUnstealable(consumer))
         {
             entry.routeToAlternate(null, null);
         }
@@ -468,7 +473,7 @@ public class ConsumerTarget_0_10 extends
         final ServerMessage msg = entry.getMessage();
 
         int requeues = 0;
-        if (entry.lockAcquisition(consumer))
+        if (entry.makeAcquisitionUnstealable(consumer))
         {
             requeues = entry.routeToAlternate(new Action<MessageInstance>()
             {

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Mon Sep 12 13:58:15 2016
@@ -527,7 +527,7 @@ public class ServerSession extends Sessi
                             final ConsumerTarget_0_10 target,
                             final MessageInstance entry)
     {
-        if (entry.lockAcquisition(consumer))
+        if (entry.makeAcquisitionUnstealable(consumer))
         {
             _transaction.dequeue(entry.getEnqueueRecord(),
                                  new ServerTransaction.Action()

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Sep 12 13:58:15 2016
@@ -1614,7 +1614,7 @@ public class AMQChannel
             {
                 for(MessageInstance entry : _ackedMessages)
                 {
-                    entry.unlockAcquisition();
+                    entry.makeAcquisitionStealable();
                 }
                 _resendList.addAll(_ackedMessages);
             }
@@ -1778,7 +1778,7 @@ public class AMQChannel
         {
             final ServerMessage msg = rejectedQueueEntry.getMessage();
             int requeues = 0;
-            if (rejectedQueueEntry.lockAcquisition(rejectedQueueEntry.getAcquiringConsumer()))
+            if (rejectedQueueEntry.makeAcquisitionUnstealable(rejectedQueueEntry.getAcquiringConsumer()))
             {
                 requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
                 {

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Mon Sep 12 13:58:15 2016
@@ -34,6 +34,7 @@ import org.apache.qpid.server.consumer.C
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstance.EntryState;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -528,10 +529,6 @@ public abstract class ConsumerTarget_0_8
     @Override
     public void acquisitionRemoved(final MessageInstance node)
     {
-        if (node.removeStateChangeListener(_unacknowledgedMessageListener))
-        {
-            removeUnacknowledgedMessage(node);
-        }
     }
 
     public long getUnacknowledgedBytes()
@@ -544,17 +541,22 @@ public abstract class ConsumerTarget_0_8
         return _unacknowledgedCount.longValue();
     }
 
-    private final StateChangeListener<MessageInstance, MessageInstance.State> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, MessageInstance.State>()
+    private final StateChangeListener<MessageInstance, EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, EntryState>()
     {
-
-        public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState)
+        @Override
+        public void stateChanged(MessageInstance entry, EntryState oldState, EntryState newState)
         {
-            if(oldState == MessageInstance.State.ACQUIRED && newState != MessageInstance.State.ACQUIRED)
+            if (isConsumerAcquiredStateForThis(oldState) && !isConsumerAcquiredStateForThis(newState))
             {
                 removeUnacknowledgedMessage(entry);
                 entry.removeStateChangeListener(this);
             }
+        }
 
+        private boolean isConsumerAcquiredStateForThis(EntryState state)
+        {
+            return state instanceof MessageInstance.ConsumerAcquiredState
+                   && ((MessageInstance.ConsumerAcquiredState) state).getConsumer().getTarget() == ConsumerTarget_0_8.this;
         }
     };
 }

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Mon Sep 12 13:58:15 2016
@@ -155,7 +155,7 @@ public class UnacknowledgedMessageMapImp
             List<MessageInstance> acknowledged = new ArrayList<>();
             for (MessageInstance instance : ackedMessageMap.values())
             {
-                if (instance.lockAcquisition(instance.getAcquiringConsumer()))
+                if (instance.makeAcquisitionUnstealable(instance.getAcquiringConsumer()))
                 {
                     acknowledged.add(instance);
                 }
@@ -169,7 +169,7 @@ public class UnacknowledgedMessageMapImp
             {
                 instance = remove(deliveryTag);
             }
-            if(instance != null && instance.lockAcquisition(instance.getAcquiringConsumer()))
+            if(instance != null && instance.makeAcquisitionUnstealable(instance.getAcquiringConsumer()))
             {
                 return Collections.singleton(instance);
             }

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java Mon Sep 12 13:58:15 2016
@@ -50,8 +50,8 @@ public class UnacknowledgedMessageMapTes
         map = new UnacknowledgedMessageMapImpl(100);
         msgs = populateMap(map,expectedSize);
         // simulate some messages being ttl expired
-        when(msgs[2].lockAcquisition(_consumer)).thenReturn(Boolean.FALSE);
-        when(msgs[4].lockAcquisition(_consumer)).thenReturn(Boolean.FALSE);
+        when(msgs[2].makeAcquisitionUnstealable(_consumer)).thenReturn(Boolean.FALSE);
+        when(msgs[4].makeAcquisitionUnstealable(_consumer)).thenReturn(Boolean.FALSE);
 
         assertEquals(expectedSize,map.size());
 
@@ -80,7 +80,7 @@ public class UnacknowledgedMessageMapTes
     private MessageInstance createMessageInstance(final int id)
     {
         MessageInstance instance = mock(MessageInstance.class);
-        when(instance.lockAcquisition(_consumer)).thenReturn(Boolean.TRUE);
+        when(instance.makeAcquisitionUnstealable(_consumer)).thenReturn(Boolean.TRUE);
         when(instance.getAcquiringConsumer()).thenReturn(_consumer);
         return instance;
     }

Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Mon Sep 12 13:58:15 2016
@@ -388,7 +388,7 @@ class ConsumerTarget_1_0 extends Abstrac
 
             if(outcome instanceof Accepted)
             {
-                if (_queueEntry.lockAcquisition(getConsumer()))
+                if (_queueEntry.makeAcquisitionUnstealable(getConsumer()))
                 {
                     txn.dequeue(_queueEntry.getEnqueueRecord(),
                                 new ServerTransaction.Action()

Modified: qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Mon Sep 12 13:58:15 2016
@@ -1027,13 +1027,13 @@ class ManagementNode implements MessageS
         }
 
         @Override
-        public void addStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+        public void addStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
         {
 
         }
 
         @Override
-        public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+        public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
         {
             return false;
         }
@@ -1118,13 +1118,13 @@ class ManagementNode implements MessageS
         }
 
         @Override
-        public boolean lockAcquisition(final ConsumerImpl consumer)
+        public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
         {
             return false;
         }
 
         @Override
-        public boolean unlockAcquisition()
+        public boolean makeAcquisitionStealable()
         {
             return false;
         }

Modified: qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java Mon Sep 12 13:58:15 2016
@@ -66,13 +66,13 @@ class ManagementResponse implements Mess
     }
 
     @Override
-    public void addStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+    public void addStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
     {
 
     }
 
     @Override
-    public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+    public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
     {
         return false;
     }
@@ -157,13 +157,13 @@ class ManagementResponse implements Mess
     }
 
     @Override
-    public boolean lockAcquisition(final ConsumerImpl consumer)
+    public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
     {
         return false;
     }
 
     @Override
-    public boolean unlockAcquisition()
+    public boolean makeAcquisitionStealable()
     {
         return false;
     }

Modified: qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java (original)
+++ qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java Mon Sep 12 13:58:15 2016
@@ -93,7 +93,7 @@ public class QpidBrokerTestCase extends
     protected static final Logger _logger = LoggerFactory.getLogger(QpidBrokerTestCase.class);
     protected static final int LOGMONITOR_TIMEOUT = 5000;
 
-    protected long RECEIVE_TIMEOUT = Long.getLong("qpid.test_receive_timeout", 1000l);
+    protected static long RECEIVE_TIMEOUT = Long.getLong("qpid.test_receive_timeout", 1000l);
 
 
     private Map<String, String> _propertiesSetForBroker = new HashMap<String, String>();

Modified: qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java (original)
+++ qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java Mon Sep 12 13:58:15 2016
@@ -69,18 +69,9 @@ public class FlowControlTest extends Qpi
         Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer producer = producerSession.createProducer(_queue);
 
-        BytesMessage m1 = producerSession.createBytesMessage();
-        m1.writeBytes(new byte[128]);
-        m1.setIntProperty("msg", 1);
-        producer.send(m1);
-        BytesMessage m2 = producerSession.createBytesMessage();
-        m2.writeBytes(new byte[128]);
-        m2.setIntProperty("msg", 2);
-        producer.send(m2);
-        BytesMessage m3 = producerSession.createBytesMessage();
-        m3.writeBytes(new byte[256]);
-        m3.setIntProperty("msg", 3);
-        producer.send(m3);
+        sendBytesMessage(producerSession, producer, 1, 128);
+        sendBytesMessage(producerSession, producer, 2, 128);
+        sendBytesMessage(producerSession, producer, 3, 256);
 
         producer.close();
         producerSession.close();
@@ -139,18 +130,9 @@ public class FlowControlTest extends Qpi
         Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer producer = producerSession.createProducer(_queue);
 
-        BytesMessage m1 = producerSession.createBytesMessage();
-        m1.writeBytes(new byte[128]);
-        m1.setIntProperty("msg", 1);
-        producer.send(m1);
-        BytesMessage m2 = producerSession.createBytesMessage();
-        m2.writeBytes(new byte[256]);
-        m2.setIntProperty("msg", 2);
-        producer.send(m2);
-        BytesMessage m3 = producerSession.createBytesMessage();
-        m3.writeBytes(new byte[128]);
-        m3.setIntProperty("msg", 3);
-        producer.send(m3);
+        sendBytesMessage(producerSession, producer, 1, 128);
+        sendBytesMessage(producerSession, producer, 2, 256);
+        sendBytesMessage(producerSession, producer, 3, 128);
 
         producer.close();
         producerSession.close();
@@ -195,27 +177,47 @@ public class FlowControlTest extends Qpi
 
     }
 
-    public static void main(String args[]) throws Throwable
+    public void testDeliverMessageLargerThanBytesLimit() throws Exception
     {
-        FlowControlTest test = new FlowControlTest();
+        _queue = (Queue) getInitialContext().lookup("queue");
+        Connection connection = getConnection();
+        connection.start();
+
+        Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producerSession.createConsumer(_queue).close();
+        MessageProducer producer = producerSession.createProducer(_queue);
+
+        sendBytesMessage(producerSession, producer, 1, 128);
+        sendBytesMessage(producerSession, producer, 2, 256);
+
+        Session consumerSession1 = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        ((AMQSession_0_8) consumerSession1).setPrefetchLimits(0, 64);
+        MessageConsumer recv1 = consumerSession1.createConsumer(_queue);
+
+        Message r1 = recv1.receive(RECEIVE_TIMEOUT);
+        assertNotNull("First message not received", r1);
+        assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg"));
 
-        int run = 0;
-        while (true)
-        {
-            System.err.println("Test Run:" + ++run);
-            Thread.sleep(1000);
-            try
-            {
-                test.startBroker();
-                test.testBasicBytesFlowControl();
-
-                Thread.sleep(1000);
-            }
-            finally
-            {
-                test.stopBroker();
-            }
-        }
+        Message r2 = recv1.receive(RECEIVE_TIMEOUT);
+        assertNull("Second message incorrectly delivered", r2);
+
+        r1.acknowledge();
+
+        r2 = recv1.receive(RECEIVE_TIMEOUT);
+        assertNotNull("Second message not received", r2);
+        assertEquals("Wrong messages received", 2, r2.getIntProperty("msg"));
+
+        r2.acknowledge();
+    }
+
+    private void sendBytesMessage(final Session producerSession,
+                                  final MessageProducer producer,
+                                  final int messageId, final int messageSize) throws Exception
+    {
+        BytesMessage message = producerSession.createBytesMessage();
+        message.writeBytes(new byte[messageSize]);
+        message.setIntProperty("msg", messageId);
+        producer.send(message);
     }
 }
 

Modified: qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Mon Sep 12 13:58:15 2016
@@ -20,14 +20,15 @@
  */
 package org.apache.qpid.test.unit.transacted;
 
-import org.apache.qpid.client.RejectBehaviour;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -37,8 +38,19 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.RejectBehaviour;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 /**
  * This class tests a number of commits and roll back scenarios
@@ -50,8 +62,8 @@ import java.util.concurrent.TimeUnit;
 public class CommitRollbackTest extends QpidBrokerTestCase
 {
     private static final Logger _logger = LoggerFactory.getLogger(CommitRollbackTest.class);
-    private static final int POSITIVE_TIMEOUT = 2000;
-    private static final int NEGATIVE_TIMEOUT = 250;
+    private static final long POSITIVE_TIMEOUT = QpidBrokerTestCase.RECEIVE_TIMEOUT;
+    private static final long NEGATIVE_TIMEOUT = QpidBrokerTestCase.RECEIVE_TIMEOUT;
 
     protected AMQConnection _conn;
     private Session _session;
@@ -472,7 +484,7 @@ public class CommitRollbackTest extends
 
         _logger.info("Receiving messages");
 
-        Message result = _consumer.receive(POSITIVE_TIMEOUT);;
+        Message result = _consumer.receive(POSITIVE_TIMEOUT);
         assertNotNull("Message expected", result);
         // Expect the first message
         assertEquals("Unexpected message received", 0, result.getIntProperty(INDEX));
@@ -502,6 +514,106 @@ public class CommitRollbackTest extends
         }
     }
 
+    public void testRollbackSoak() throws Exception
+    {
+        newConnection();
+        final int rollbackTime = 2000;
+        final int numberOfMessages = 1000;
+        final int numberOfConsumers = 2;
+        final long testTimeout = numberOfMessages * POSITIVE_TIMEOUT / numberOfConsumers;
+        sendMessage(_pubSession, _jmsQueue, numberOfMessages);
+
+        List<ListenableFuture<Void >> consumerFutures = new ArrayList<>(numberOfConsumers);
+        final ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfConsumers));
+
+        try
+        {
+            final CountDownLatch modeFlippedLatch = new CountDownLatch(1);
+            final AtomicInteger counter = new AtomicInteger();
+            final AtomicInteger rollbackCounter = new AtomicInteger();
+            final long flipModeTime = System.currentTimeMillis() + rollbackTime;
+            final AtomicBoolean shutdown = new AtomicBoolean();
+
+            for (int i = 0; i < numberOfConsumers; ++i)
+            {
+                consumerFutures.add(threadPool.submit(new Callable<Void>()
+                {
+                    @Override
+                    public Void call() throws Exception
+                    {
+                        Session session = _conn.createSession(true, Session.SESSION_TRANSACTED);
+                        final MessageConsumer consumer = session.createConsumer(_jmsQueue);
+
+                        while(!shutdown.get())
+                        {
+                            Message m = consumer.receive(POSITIVE_TIMEOUT);
+                            if (m != null)
+                            {
+                                long currentTime = System.currentTimeMillis();
+                                if (currentTime < flipModeTime)
+                                {
+                                    session.rollback();
+                                    rollbackCounter.incrementAndGet();
+                                }
+                                else
+                                {
+                                    modeFlippedLatch.countDown();
+                                    counter.incrementAndGet();
+                                    session.commit();
+                                }
+                            }
+
+                            if (counter.get() == numberOfMessages)
+                            {
+                                break;
+                            }
+
+                            if (Thread.currentThread().isInterrupted())
+                            {
+                                break;
+                            }
+                        }
+
+                        return null;
+                    }
+                }));
+            }
+
+            final ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(consumerFutures);
+            modeFlippedLatch.await(rollbackTime * 2, TimeUnit.MILLISECONDS);
+            try
+            {
+                combinedFuture.get(testTimeout, TimeUnit.MILLISECONDS);
+                _logger.debug("Performed {} rollbacks, consumed {}/{} messages",
+                              rollbackCounter.get(),
+                              counter.get(),
+                              numberOfMessages);
+            }
+            catch (TimeoutException e)
+            {
+                fail(String.format(
+                        "Test took more than %.1f seconds. All consumers probably starved. Performed %d rollbacks, consumed %d/%d messages",
+                        testTimeout / 1000.,
+                        rollbackCounter.get(),
+                        counter.get(),
+                        numberOfMessages));
+            }
+            finally
+            {
+                shutdown.set(true);
+            }
+            assertEquals(String.format(
+                    "Unexpected number of messages received. Performed %d rollbacks, consumed %d/%d messages",
+                    rollbackCounter.get(),
+                    counter.get(),
+                    numberOfMessages), numberOfMessages, counter.get());
+        }
+        finally
+        {
+            threadPool.shutdownNow();
+            threadPool.awaitTermination(2 * POSITIVE_TIMEOUT, TimeUnit.SECONDS);
+        }
+    }
 
     public void testResendUnseenMessagesAfterRollback() throws Exception
     {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org