You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/09/12 13:58:15 UTC
svn commit: r1760370 - in /qpid/java/branches/6.0.x: ./
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-core/src/main/ja...
Author: lquack
Date: Mon Sep 12 13:58:15 2016
New Revision: 1760370
URL: http://svn.apache.org/viewvc?rev=1760370&view=rev
Log:
QPID-7417: [Java Broker] Ensure message instance listeners only fire on state change of the associated object
Merged from trunk with command:
svn merge -c 1760032,1760337 ^/qpid/java/trunk
Modified:
qpid/java/branches/6.0.x/ (props changed)
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 12 13:58:15 2016
@@ -9,5 +9,5 @@
/qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732465,1732525,1732812,1733467,1734452,1736478,1736751,1736838,1737804,1737835,1737853,1737984,1737992,1738119,1738135,1738231,1738271,1738607,1738610,1738731,1738914,1741702,1742257,1742284,1742339,1742544,1742900,1742926,1743161,1743228,1743383,1743982,1744012-1744013,1744046,1744123,1744157,1744276,1744403,1745424,1745450,1746140,1746273,1747526,1748254,1748723,1748818,1749349,1749399,1749482,1749524,1750359-1750360,1750943,1751433,1754251,1754354,1754392,1754429,1754510,1754550,1755561,1755957,1758628,1758640,1758766,1758964,1759774,1759783
+/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732465,1732525,1732812,1733467,1734452,1736478,1736751,1736838,1737804,1737835,1737853,1737984,1737992,1738119,1738135,1738231,1738271,1738607,1738610,1738731,1738914,1741702,1742257,1742284,1742339,1742544,1742900,1742926,1743161,1743228,1743383,1743982,1744012-1744013,1744046,1744123,1744157,1744276,1744403,1745424,1745450,1746140,1746273,1747526,1748254,1748723,1748818,1749349,1749399,1749482,1749524,1750359-1750360,1750943,1751433,1754251,1754354,1754392,1754429,1754510,1754550,1755561,1755957,1758628,1758640,1758766,1758964,1759774,1759783,1760032,1760337
/qpid/trunk/qpid:796646-796653
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Mon Sep 12 13:58:15 2016
@@ -220,7 +220,7 @@ public abstract class AbstractConsumerTa
if (consumer.acquires())
{
- entry.unlockAcquisition();
+ entry.makeAcquisitionStealable();
}
}
finally
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Mon Sep 12 13:58:15 2016
@@ -44,9 +44,9 @@ public interface MessageInstance
void decrementDeliveryCount();
- void addStateChangeListener(StateChangeListener<? super MessageInstance,State> listener);
+ void addStateChangeListener(StateChangeListener<? super MessageInstance, EntryState> listener);
- boolean removeStateChangeListener(StateChangeListener<? super MessageInstance, State> listener);
+ boolean removeStateChangeListener(StateChangeListener<? super MessageInstance, EntryState> listener);
boolean acquiredByConsumer();
@@ -70,9 +70,9 @@ public interface MessageInstance
boolean acquire(ConsumerImpl sub);
- boolean lockAcquisition(final ConsumerImpl consumer);
+ boolean makeAcquisitionUnstealable(final ConsumerImpl consumer);
- boolean unlockAcquisition();
+ boolean makeAcquisitionStealable();
int getMaximumDeliveryCount();
@@ -84,7 +84,7 @@ public interface MessageInstance
MessageEnqueueRecord getEnqueueRecord();
- public static enum State
+ enum State
{
AVAILABLE,
ACQUIRED,
@@ -92,7 +92,7 @@ public interface MessageInstance
DELETED
}
- public abstract class EntryState
+ abstract class EntryState
{
protected EntryState()
{
@@ -114,7 +114,7 @@ public interface MessageInstance
}
- public final class AvailableState extends EntryState
+ final class AvailableState extends EntryState
{
public State getState()
@@ -129,7 +129,7 @@ public interface MessageInstance
}
- public final class DequeuedState extends EntryState
+ final class DequeuedState extends EntryState
{
public State getState()
@@ -144,7 +144,7 @@ public interface MessageInstance
}
- public final class DeletedState extends EntryState
+ final class DeletedState extends EntryState
{
public State getState()
@@ -158,7 +158,7 @@ public interface MessageInstance
}
}
- public final class NonConsumerAcquiredState extends EntryState
+ final class NonConsumerAcquiredState extends EntryState
{
public State getState()
{
@@ -171,76 +171,72 @@ public interface MessageInstance
}
}
- public final class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState
+ abstract class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState
{
- private final C _consumer;
- private final LockedAcquiredState<C> _lockedState;
+ public abstract C getConsumer();
- public ConsumerAcquiredState(C consumer)
+ @Override
+ public final State getState()
{
- _consumer = consumer;
- _lockedState = new LockedAcquiredState<>(this);
+ return State.ACQUIRED;
}
-
- public State getState()
+ @Override
+ public String toString()
{
- return State.ACQUIRED;
+ return "{" + getState().name() + " : " + getConsumer() +"}";
}
+ }
- public C getConsumer()
+ final class StealableConsumerAcquiredState<C extends ConsumerImpl> extends ConsumerAcquiredState
+ {
+ private final C _consumer;
+ private final UnstealableConsumerAcquiredState<C> _unstealableState;
+
+ public StealableConsumerAcquiredState(C consumer)
{
- return _consumer;
+ _consumer = consumer;
+ _unstealableState = new UnstealableConsumerAcquiredState<>(this);
}
- public String toString()
+ @Override
+ public C getConsumer()
{
- return "{" + getState().name() + " : " + _consumer +"}";
+ return _consumer;
}
- public LockedAcquiredState<C> getLockedState()
+ public UnstealableConsumerAcquiredState<C> getUnstealableState()
{
- return _lockedState;
+ return _unstealableState;
}
-
}
- public final class LockedAcquiredState<C extends ConsumerImpl> extends EntryState
+ final class UnstealableConsumerAcquiredState<C extends ConsumerImpl> extends ConsumerAcquiredState
{
- private final ConsumerAcquiredState<C> _acquiredState;
+ private final StealableConsumerAcquiredState<C> _stealableState;
- public LockedAcquiredState(final ConsumerAcquiredState<C> acquiredState)
+ public UnstealableConsumerAcquiredState(final StealableConsumerAcquiredState<C> stealableState)
{
- _acquiredState = acquiredState;
+ _stealableState = stealableState;
}
@Override
- public State getState()
- {
- return State.ACQUIRED;
- }
-
public C getConsumer()
{
- return _acquiredState.getConsumer();
- }
-
- public String toString()
- {
- return "{" + getState().name() + " : " + _acquiredState.getConsumer() +"}";
+ return _stealableState.getConsumer();
}
- public ConsumerAcquiredState<C> getUnlockedState()
+ public StealableConsumerAcquiredState<C> getStealableState()
{
- return _acquiredState;
+ return _stealableState;
}
}
- final static EntryState AVAILABLE_STATE = new AvailableState();
- final static EntryState DELETED_STATE = new DeletedState();
- final static EntryState DEQUEUED_STATE = new DequeuedState();
- final static EntryState NON_CONSUMER_ACQUIRED_STATE = new NonConsumerAcquiredState();
+ EntryState AVAILABLE_STATE = new AvailableState();
+ EntryState DELETED_STATE = new DeletedState();
+ EntryState DEQUEUED_STATE = new DequeuedState();
+ EntryState NON_CONSUMER_ACQUIRED_STATE = new NonConsumerAcquiredState();
boolean isAvailable();
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java Mon Sep 12 13:58:15 2016
@@ -22,6 +22,8 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
+import org.apache.qpid.server.message.MessageInstance.EntryState;
import org.apache.qpid.server.util.StateChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -172,7 +174,7 @@ public class DefinedGroupMessageGroupMan
_groupMap.put(groupId, group);
- // there's a small change that the group became empty between the point at which getNextAvailable() was
+ // there's a small chance that the group became empty between the point at which getNextAvailable() was
// called on the consumer, and when accept message is called... in that case we want to avoid delivering
// out of order
if(_resetHelper.isEntryAheadOfConsumer(entry, sub))
@@ -256,7 +258,7 @@ public class DefinedGroupMessageGroupMan
return groupVal;
}
- private class GroupStateChangeListener implements StateChangeListener<MessageInstance, MessageInstance.State>
+ private class GroupStateChangeListener implements StateChangeListener<MessageInstance, EntryState>
{
private final Group _group;
@@ -265,24 +267,20 @@ public class DefinedGroupMessageGroupMan
_group = group;
}
- public void stateChanged(final MessageInstance entry,
- final MessageInstance.State oldState,
- final MessageInstance.State newState)
+ @Override
+ public void stateChanged(final MessageInstance entry, final EntryState oldState, final EntryState newState)
{
synchronized (DefinedGroupMessageGroupManager.this)
{
if(_group.isValid())
{
- if(oldState != newState)
+ if (isConsumerAcquiredStateForThisGroup(newState) && !isConsumerAcquiredStateForThisGroup(oldState))
+ {
+ _group.add();
+ }
+ else if (isConsumerAcquiredStateForThisGroup(oldState) && !isConsumerAcquiredStateForThisGroup(newState))
{
- if(newState == QueueEntry.State.ACQUIRED)
- {
- _group.add();
- }
- else if(oldState == QueueEntry.State.ACQUIRED)
- {
- _group.subtract((QueueEntry) entry, newState == MessageInstance.State.AVAILABLE);
- }
+ _group.subtract((QueueEntry) entry, newState.getState() == MessageInstance.State.AVAILABLE);
}
}
else
@@ -291,5 +289,11 @@ public class DefinedGroupMessageGroupMan
}
}
}
+
+ private boolean isConsumerAcquiredStateForThisGroup(EntryState state)
+ {
+ return state instanceof ConsumerAcquiredState
+ && ((ConsumerAcquiredState) state).getConsumer() == _group.getConsumer();
+ }
}
}
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Mon Sep 12 13:58:15 2016
@@ -47,7 +47,7 @@ public interface QueueConsumer<X extends
boolean resend(QueueEntry e);
- MessageInstance.ConsumerAcquiredState<X> getOwningState();
+ MessageInstance.StealableConsumerAcquiredState<X> getOwningState();
QueueContext getQueueContext();
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Mon Sep 12 13:58:15 2016
@@ -70,7 +70,8 @@ class QueueConsumerImpl
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final long _consumerNumber;
private final long _createTime = System.currentTimeMillis();
- private final MessageInstance.ConsumerAcquiredState<QueueConsumerImpl> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumerImpl>(this);
+ private final MessageInstance.StealableConsumerAcquiredState<QueueConsumerImpl>
+ _owningState = new MessageInstance.StealableConsumerAcquiredState<>(this);
private final WaitingOnCreditMessageListener _waitingOnCreditMessageListener = new WaitingOnCreditMessageListener();
private final boolean _acquires;
private final boolean _seesRequeues;
@@ -520,7 +521,7 @@ class QueueConsumerImpl
return _createTime;
}
- public final MessageInstance.ConsumerAcquiredState<QueueConsumerImpl> getOwningState()
+ public final MessageInstance.StealableConsumerAcquiredState<QueueConsumerImpl> getOwningState()
{
return _owningState;
}
@@ -626,7 +627,7 @@ class QueueConsumerImpl
return _queue.getEventLogger();
}
- public class WaitingOnCreditMessageListener implements StateChangeListener<MessageInstance, QueueEntry.State>
+ public class WaitingOnCreditMessageListener implements StateChangeListener<MessageInstance, MessageInstance.EntryState>
{
private final AtomicReference<MessageInstance> _entry = new AtomicReference<>();
@@ -657,7 +658,8 @@ class QueueConsumerImpl
}
- public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState)
+ @Override
+ public void stateChanged(MessageInstance entry, MessageInstance.EntryState oldState, MessageInstance.EntryState newState)
{
entry.removeStateChangeListener(this);
_entry.compareAndSet(entry, null);
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Sep 12 13:58:15 2016
@@ -80,7 +80,7 @@ public abstract class QueueEntryImpl imp
(QueueEntryImpl.class, EntryState.class, "_state");
- private volatile StateChangeListenerEntry<? super QueueEntry, State> _stateChangeListeners;
+ private volatile StateChangeListenerEntry<? super QueueEntry, EntryState> _stateChangeListeners;
private static final
AtomicReferenceFieldUpdater<QueueEntryImpl, StateChangeListenerEntry>
@@ -223,7 +223,7 @@ public abstract class QueueEntryImpl imp
{
return acquire(NON_CONSUMER_ACQUIRED_STATE);
}
- private class DelayedAcquisitionStateListener implements StateChangeListener<MessageInstance, State>
+ private class DelayedAcquisitionStateListener implements StateChangeListener<MessageInstance, EntryState>
{
private final Runnable _task;
private final AtomicBoolean _run = new AtomicBoolean();
@@ -234,13 +234,13 @@ public abstract class QueueEntryImpl imp
}
@Override
- public void stateChanged(final MessageInstance object, final State oldState, final State newState)
+ public void stateChanged(final MessageInstance object, final EntryState oldState, final EntryState newState)
{
- if(newState == State.DELETED || newState == State.DEQUEUED)
+ if (newState.equals(DELETED_STATE) || newState.equals(DEQUEUED_STATE))
{
QueueEntryImpl.this.removeStateChangeListener(this);
}
- else if(acquireOrSteal(null))
+ else if (acquireOrSteal(null))
{
runTask();
}
@@ -287,7 +287,7 @@ public abstract class QueueEntryImpl imp
EntryState currentState;
- while((currentState = _state).getState() == State.AVAILABLE)
+ while((currentState = _state).equals(AVAILABLE_STATE))
{
if(acquired = _stateUpdater.compareAndSet(this, currentState, state))
{
@@ -297,7 +297,7 @@ public abstract class QueueEntryImpl imp
if(acquired && _stateChangeListeners != null)
{
- notifyStateChange(State.AVAILABLE, State.ACQUIRED);
+ notifyStateChange(AVAILABLE_STATE, state);
}
return acquired;
@@ -305,7 +305,7 @@ public abstract class QueueEntryImpl imp
public boolean acquire(ConsumerImpl sub)
{
- final boolean acquired = acquire(((QueueConsumer<?>) sub).getOwningState().getLockedState());
+ final boolean acquired = acquire(((QueueConsumer<?>) sub).getOwningState().getUnstealableState());
if(acquired)
{
_deliveryCountUpdater.compareAndSet(this,-1,0);
@@ -315,33 +315,35 @@ public abstract class QueueEntryImpl imp
}
@Override
- public boolean lockAcquisition(final ConsumerImpl consumer)
+ public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
{
EntryState state = _state;
- if(state instanceof ConsumerAcquiredState && ((ConsumerAcquiredState) state).getConsumer() == consumer)
+ if(state instanceof StealableConsumerAcquiredState
+ && ((StealableConsumerAcquiredState) state).getConsumer() == consumer)
{
- LockedAcquiredState lockedState = ((ConsumerAcquiredState) state).getLockedState();
- boolean updated = _stateUpdater.compareAndSet(this, state, lockedState);
+ UnstealableConsumerAcquiredState unstealableState = ((StealableConsumerAcquiredState) state).getUnstealableState();
+ boolean updated = _stateUpdater.compareAndSet(this, state, unstealableState);
if(updated)
{
- notifyStateChange(state.getState(), lockedState.getState());
+ notifyStateChange(state, unstealableState);
}
return updated;
}
- return state instanceof LockedAcquiredState && ((LockedAcquiredState) state).getConsumer() == consumer;
+ return state instanceof UnstealableConsumerAcquiredState
+ && ((UnstealableConsumerAcquiredState) state).getConsumer() == consumer;
}
@Override
- public boolean unlockAcquisition()
+ public boolean makeAcquisitionStealable()
{
EntryState state = _state;
- if(state instanceof LockedAcquiredState)
+ if(state instanceof UnstealableConsumerAcquiredState)
{
- ConsumerAcquiredState unlockedState = ((LockedAcquiredState) state).getUnlockedState();
- boolean updated = _stateUpdater.compareAndSet(this, state, unlockedState);
+ StealableConsumerAcquiredState stealableState = ((UnstealableConsumerAcquiredState) state).getStealableState();
+ boolean updated = _stateUpdater.compareAndSet(this, state, stealableState);
if(updated)
{
- notifyStateChange(state.getState(),unlockedState.getState());
+ notifyStateChange(state, stealableState);
}
return updated;
}
@@ -350,8 +352,7 @@ public abstract class QueueEntryImpl imp
public boolean acquiredByConsumer()
{
-
- return (_state instanceof ConsumerAcquiredState) || (_state instanceof LockedAcquiredState);
+ return _state instanceof ConsumerAcquiredState;
}
@Override
@@ -359,14 +360,10 @@ public abstract class QueueEntryImpl imp
{
ConsumerImpl consumer;
EntryState state = _state;
- if(state instanceof ConsumerAcquiredState)
+ if (state instanceof ConsumerAcquiredState)
{
consumer = ((ConsumerAcquiredState)state).getConsumer();
}
- else if(state instanceof LockedAcquiredState)
- {
- consumer = ((LockedAcquiredState)state).getConsumer();
- }
else
{
consumer = null;
@@ -378,20 +375,22 @@ public abstract class QueueEntryImpl imp
public boolean isAcquiredBy(ConsumerImpl consumer)
{
EntryState state = _state;
- return (state instanceof ConsumerAcquiredState
- && ((ConsumerAcquiredState)state).getConsumer() == consumer)
- || (state instanceof LockedAcquiredState
- && ((LockedAcquiredState)state).getConsumer() == consumer);
+ return (state instanceof ConsumerAcquiredState && ((ConsumerAcquiredState)state).getConsumer() == consumer);
}
@Override
public boolean removeAcquisitionFromConsumer(ConsumerImpl consumer)
{
EntryState state = _state;
- if(state instanceof ConsumerAcquiredState
- && ((ConsumerAcquiredState)state).getConsumer() == consumer)
+ if(state instanceof StealableConsumerAcquiredState
+ && ((StealableConsumerAcquiredState)state).getConsumer() == consumer)
{
- return _stateUpdater.compareAndSet(this,state,NON_CONSUMER_ACQUIRED_STATE);
+ final boolean stateWasChanged = _stateUpdater.compareAndSet(this, state, NON_CONSUMER_ACQUIRED_STATE);
+ if (stateWasChanged)
+ {
+ notifyStateChange(state, NON_CONSUMER_ACQUIRED_STATE);
+ }
+ return stateWasChanged;
}
else
{
@@ -422,7 +421,7 @@ public abstract class QueueEntryImpl imp
private void postRelease(final EntryState previousState)
{
- if(previousState instanceof ConsumerAcquiredState || previousState instanceof LockedAcquiredState)
+ if (previousState instanceof ConsumerAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
}
@@ -430,9 +429,9 @@ public abstract class QueueEntryImpl imp
if(!getQueue().isDeleted())
{
getQueue().requeue(this);
- if(_stateChangeListeners != null && previousState.getState() == State.ACQUIRED)
+ if (_stateChangeListeners != null && previousState.getState() == State.ACQUIRED)
{
- notifyStateChange(State.ACQUIRED, State.AVAILABLE);
+ notifyStateChange(previousState, AVAILABLE_STATE);
}
}
@@ -477,19 +476,7 @@ public abstract class QueueEntryImpl imp
@Override
public QueueConsumer getDeliveredConsumer()
{
- EntryState state = _state;
- if (state instanceof ConsumerAcquiredState)
- {
- return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer();
- }
- else if (state instanceof LockedAcquiredState)
- {
- return (QueueConsumer) ((LockedAcquiredState) state).getConsumer();
- }
- else
- {
- return null;
- }
+ return (QueueConsumer) getAcquiringConsumer();
}
public void reject()
@@ -535,7 +522,7 @@ public abstract class QueueEntryImpl imp
if(state.getState() == State.ACQUIRED)
{
- if (state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
+ if (state instanceof ConsumerAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
}
@@ -543,7 +530,7 @@ public abstract class QueueEntryImpl imp
getQueue().dequeue(this);
if(_stateChangeListeners != null)
{
- notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
+ notifyStateChange(state, DEQUEUED_STATE);
}
return true;
}
@@ -554,12 +541,12 @@ public abstract class QueueEntryImpl imp
}
- private void notifyStateChange(final State oldState, final State newState)
+ private void notifyStateChange(final EntryState oldState, final EntryState newState)
{
- StateChangeListenerEntry<? super QueueEntry, State> entry = _listenersUpdater.get(this);
+ StateChangeListenerEntry<? super QueueEntry, EntryState> entry = _listenersUpdater.get(this);
while(entry != null)
{
- StateChangeListener<? super QueueEntry, State> l = entry.getListener();
+ StateChangeListener<? super QueueEntry, EntryState> l = entry.getListener();
if(l != null)
{
l.stateChanged(this, oldState, newState);
@@ -649,16 +636,16 @@ public abstract class QueueEntryImpl imp
return getQueue().isDeleted();
}
- public void addStateChangeListener(StateChangeListener<? super MessageInstance,State> listener)
+ public void addStateChangeListener(StateChangeListener<? super MessageInstance, EntryState> listener)
{
- StateChangeListenerEntry<? super QueueEntry, State> entry = new StateChangeListenerEntry<>(listener);
+ StateChangeListenerEntry<? super QueueEntry, EntryState> entry = new StateChangeListenerEntry<>(listener);
if(!_listenersUpdater.compareAndSet(this,null, entry))
{
_listenersUpdater.get(this).add(entry);
}
}
- public boolean removeStateChangeListener(StateChangeListener<? super MessageInstance, State> listener)
+ public boolean removeStateChangeListener(StateChangeListener<? super MessageInstance, EntryState> listener)
{
StateChangeListenerEntry entry = _listenersUpdater.get(this);
if(entry != null)
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java Mon Sep 12 13:58:15 2016
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.util;
-public interface StateChangeListener<T, E extends Enum>
+public interface StateChangeListener<T, E>
{
void stateChanged(T object, E oldState, E newState);
}
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListenerEntry.java Mon Sep 12 13:58:15 2016
@@ -22,7 +22,7 @@ package org.apache.qpid.server.util;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-public class StateChangeListenerEntry<T, E extends Enum>
+public class StateChangeListenerEntry<T, E>
{
private static final AtomicReferenceFieldUpdater<StateChangeListenerEntry, StateChangeListenerEntry> NEXT =
AtomicReferenceFieldUpdater.newUpdater(StateChangeListenerEntry.class, StateChangeListenerEntry.class, "_next");
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Mon Sep 12 13:58:15 2016
@@ -357,13 +357,13 @@ public abstract class AbstractSystemMess
}
@Override
- public void addStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+ public void addStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
{
}
@Override
- public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+ public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
{
return false;
}
@@ -448,13 +448,13 @@ public abstract class AbstractSystemMess
}
@Override
- public boolean lockAcquisition(final ConsumerImpl consumer)
+ public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
{
return false;
}
@Override
- public boolean unlockAcquisition()
+ public boolean makeAcquisitionStealable()
{
return false;
}
Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Mon Sep 12 13:58:15 2016
@@ -413,12 +413,12 @@ abstract class AbstractQueueTestBase ext
QueueEntry queueEntry = queueEntries.get(0);
final CountDownLatch dequeueIndicator = new CountDownLatch(1);
- queueEntry.addStateChangeListener(new StateChangeListener<MessageInstance, MessageInstance.State>()
+ queueEntry.addStateChangeListener(new StateChangeListener<MessageInstance, MessageInstance.EntryState>()
{
@Override
- public void stateChanged(MessageInstance object, MessageInstance.State oldState, MessageInstance.State newState)
+ public void stateChanged(MessageInstance object, MessageInstance.EntryState oldState, MessageInstance.EntryState newState)
{
- if (newState == MessageInstance.State.DEQUEUED)
+ if (newState.equals(MessageInstance.DEQUEUED_STATE))
{
dequeueIndicator.countDown();
}
Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java Mon Sep 12 13:58:15 2016
@@ -55,7 +55,8 @@ public class ConsumerListTest extends Qp
private QueueConsumer newMockConsumer()
{
QueueConsumer consumer = mock(QueueConsumer.class);
- MessageInstance.ConsumerAcquiredState owningState = new QueueEntryImpl.ConsumerAcquiredState(consumer);
+ MessageInstance.StealableConsumerAcquiredState
+ owningState = new MessageInstance.StealableConsumerAcquiredState(consumer);
when(consumer.getOwningState()).thenReturn(owningState);
return consumer;
Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Mon Sep 12 13:58:15 2016
@@ -101,13 +101,13 @@ public class MockMessageInstance impleme
}
@Override
- public boolean lockAcquisition(final ConsumerImpl consumer)
+ public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
{
return false;
}
@Override
- public boolean unlockAcquisition()
+ public boolean makeAcquisitionStealable()
{
return false;
}
@@ -214,13 +214,13 @@ public class MockMessageInstance impleme
}
@Override
- public void addStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+ public void addStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
{
}
@Override
- public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+ public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
{
return false;
}
Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Mon Sep 12 13:58:15 2016
@@ -18,8 +18,12 @@
*/
package org.apache.qpid.server.queue;
+import static org.apache.qpid.server.message.MessageInstance.NON_CONSUMER_ACQUIRED_STATE;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.lang.reflect.Field;
@@ -33,6 +37,8 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstance.EntryState;
+import org.apache.qpid.server.message.MessageInstance.StealableConsumerAcquiredState;
+import org.apache.qpid.server.message.MessageInstance.UnstealableConsumerAcquiredState;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.BrokerModel;
@@ -41,6 +47,7 @@ import org.apache.qpid.server.model.Conf
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.test.utils.QpidTestCase;
/**
@@ -135,12 +142,31 @@ public abstract class QueueEntryImplTest
{
final QueueConsumer consumer = mock(QueueConsumer.class);
- MessageInstance.ConsumerAcquiredState owningState = new QueueEntryImpl.ConsumerAcquiredState(consumer);
+ StealableConsumerAcquiredState
+ owningState = new StealableConsumerAcquiredState(consumer);
when(consumer.getOwningState()).thenReturn(owningState);
when(consumer.getConsumerNumber()).thenReturn(_consumerId++);
return consumer;
}
+ public void testStateChanges()
+ {
+ QueueConsumer consumer = newConsumer();
+ StateChangeListener<MessageInstance, EntryState> stateChangeListener = mock(StateChangeListener.class);
+ _queueEntry.addStateChangeListener(stateChangeListener);
+ _queueEntry.acquire(consumer);
+ verify(stateChangeListener).stateChanged(eq(_queueEntry),
+ eq(MessageInstance.AVAILABLE_STATE),
+ isA(UnstealableConsumerAcquiredState.class));
+ _queueEntry.makeAcquisitionStealable();
+ verify(stateChangeListener).stateChanged(eq(_queueEntry),
+ isA(UnstealableConsumerAcquiredState.class),
+ isA(StealableConsumerAcquiredState.class));
+ _queueEntry.removeAcquisitionFromConsumer(consumer);
+ verify(stateChangeListener).stateChanged(eq(_queueEntry),
+ isA(StealableConsumerAcquiredState.class),
+ eq(NON_CONSUMER_ACQUIRED_STATE));
+ }
public void testLocking()
{
@@ -152,7 +178,7 @@ public abstract class QueueEntryImplTest
_queueEntry.isAcquired());
assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
- assertTrue("Should be able to unlock locked queue entry", _queueEntry.unlockAcquisition());
+ assertTrue("Should be able to unlock locked queue entry", _queueEntry.makeAcquisitionStealable());
assertFalse("Acquisition should not be able to be removed from the wrong consumer",
_queueEntry.removeAcquisitionFromConsumer(consumer2));
assertTrue("Acquisition should be able to be removed once unlocked",
@@ -169,8 +195,8 @@ public abstract class QueueEntryImplTest
_queueEntry.isAcquired());
assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
- assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition());
- assertTrue("Should be able to lock queue entry",_queueEntry.lockAcquisition(consumer));
+ assertTrue("Should be able to unlock locked queue entry",_queueEntry.makeAcquisitionStealable());
+ assertTrue("Should be able to lock queue entry",_queueEntry.makeAcquisitionUnstealable(consumer));
assertFalse("Acquisition should not be able to be hijacked when locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
_queueEntry.delete();
@@ -185,10 +211,10 @@ public abstract class QueueEntryImplTest
_queueEntry.acquire(consumer1);
assertTrue("Queue entry should be acquired by consumer1", _queueEntry.acquiredByConsumer());
- assertTrue("Consumer1 relocking should be allowed", _queueEntry.lockAcquisition(consumer1));
- assertFalse("Consumer2 should not be allowed", _queueEntry.lockAcquisition(consumer2));
+ assertTrue("Consumer1 relocking should be allowed", _queueEntry.makeAcquisitionUnstealable(consumer1));
+ assertFalse("Consumer2 should not be allowed", _queueEntry.makeAcquisitionUnstealable(consumer2));
- _queueEntry.unlockAcquisition();
+ _queueEntry.makeAcquisitionStealable();
assertTrue("Queue entry should still be acquired by consumer1", _queueEntry.acquiredByConsumer());
Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Mon Sep 12 13:58:15 2016
@@ -341,13 +341,13 @@ public class StandardQueueTest extends A
}
@Override
- public boolean lockAcquisition(final ConsumerImpl consumer)
+ public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
{
return true;
}
@Override
- public boolean unlockAcquisition()
+ public boolean makeAcquisitionStealable()
{
return true;
}
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Mon Sep 12 13:58:15 2016
@@ -39,6 +39,8 @@ import org.apache.qpid.server.flow.FlowC
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
+import org.apache.qpid.server.message.MessageInstance.EntryState;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.plugin.MessageConverter;
@@ -89,17 +91,24 @@ public class ConsumerTarget_0_10 extends
private long _deferredSizeCredit;
private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
- private final StateChangeListener<MessageInstance, MessageInstance.State> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, MessageInstance.State>()
+ private final StateChangeListener<MessageInstance, EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, EntryState>()
{
- public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState)
+ @Override
+ public void stateChanged(MessageInstance entry, EntryState oldState, EntryState newState)
{
- if(oldState == MessageInstance.State.ACQUIRED && newState != MessageInstance.State.ACQUIRED)
+ if (isConsumerAcquiredStateForThis(oldState) && !isConsumerAcquiredStateForThis(newState))
{
removeUnacknowledgedMessage(entry);
entry.removeStateChangeListener(this);
}
}
+
+ private boolean isConsumerAcquiredStateForThis(EntryState state)
+ {
+ return state instanceof ConsumerAcquiredState
+ && ((ConsumerAcquiredState) state).getConsumer().getTarget() == ConsumerTarget_0_10.this;
+ }
};
public ConsumerTarget_0_10(ServerSession session,
@@ -383,10 +392,6 @@ public class ConsumerTarget_0_10 extends
@Override
public void acquisitionRemoved(final MessageInstance entry)
{
- if (entry.removeStateChangeListener(_unacknowledgedMessageListener))
- {
- removeUnacknowledgedMessage(entry);
- }
}
private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
@@ -433,7 +438,7 @@ public class ConsumerTarget_0_10 extends
void reject(final ConsumerImpl consumer, final MessageInstance entry)
{
entry.setRedelivered();
- if (entry.lockAcquisition(consumer))
+ if (entry.makeAcquisitionUnstealable(consumer))
{
entry.routeToAlternate(null, null);
}
@@ -468,7 +473,7 @@ public class ConsumerTarget_0_10 extends
final ServerMessage msg = entry.getMessage();
int requeues = 0;
- if (entry.lockAcquisition(consumer))
+ if (entry.makeAcquisitionUnstealable(consumer))
{
requeues = entry.routeToAlternate(new Action<MessageInstance>()
{
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Mon Sep 12 13:58:15 2016
@@ -527,7 +527,7 @@ public class ServerSession extends Sessi
final ConsumerTarget_0_10 target,
final MessageInstance entry)
{
- if (entry.lockAcquisition(consumer))
+ if (entry.makeAcquisitionUnstealable(consumer))
{
_transaction.dequeue(entry.getEnqueueRecord(),
new ServerTransaction.Action()
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Sep 12 13:58:15 2016
@@ -1614,7 +1614,7 @@ public class AMQChannel
{
for(MessageInstance entry : _ackedMessages)
{
- entry.unlockAcquisition();
+ entry.makeAcquisitionStealable();
}
_resendList.addAll(_ackedMessages);
}
@@ -1778,7 +1778,7 @@ public class AMQChannel
{
final ServerMessage msg = rejectedQueueEntry.getMessage();
int requeues = 0;
- if (rejectedQueueEntry.lockAcquisition(rejectedQueueEntry.getAcquiringConsumer()))
+ if (rejectedQueueEntry.makeAcquisitionUnstealable(rejectedQueueEntry.getAcquiringConsumer()))
{
requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
{
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Mon Sep 12 13:58:15 2016
@@ -34,6 +34,7 @@ import org.apache.qpid.server.consumer.C
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstance.EntryState;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -528,10 +529,6 @@ public abstract class ConsumerTarget_0_8
@Override
public void acquisitionRemoved(final MessageInstance node)
{
- if (node.removeStateChangeListener(_unacknowledgedMessageListener))
- {
- removeUnacknowledgedMessage(node);
- }
}
public long getUnacknowledgedBytes()
@@ -544,17 +541,22 @@ public abstract class ConsumerTarget_0_8
return _unacknowledgedCount.longValue();
}
- private final StateChangeListener<MessageInstance, MessageInstance.State> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, MessageInstance.State>()
+ private final StateChangeListener<MessageInstance, EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, EntryState>()
{
-
- public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState)
+ @Override
+ public void stateChanged(MessageInstance entry, EntryState oldState, EntryState newState)
{
- if(oldState == MessageInstance.State.ACQUIRED && newState != MessageInstance.State.ACQUIRED)
+ if (isConsumerAcquiredStateForThis(oldState) && !isConsumerAcquiredStateForThis(newState))
{
removeUnacknowledgedMessage(entry);
entry.removeStateChangeListener(this);
}
+ }
+ private boolean isConsumerAcquiredStateForThis(EntryState state)
+ {
+ return state instanceof MessageInstance.ConsumerAcquiredState
+ && ((MessageInstance.ConsumerAcquiredState) state).getConsumer().getTarget() == ConsumerTarget_0_8.this;
}
};
}
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Mon Sep 12 13:58:15 2016
@@ -155,7 +155,7 @@ public class UnacknowledgedMessageMapImp
List<MessageInstance> acknowledged = new ArrayList<>();
for (MessageInstance instance : ackedMessageMap.values())
{
- if (instance.lockAcquisition(instance.getAcquiringConsumer()))
+ if (instance.makeAcquisitionUnstealable(instance.getAcquiringConsumer()))
{
acknowledged.add(instance);
}
@@ -169,7 +169,7 @@ public class UnacknowledgedMessageMapImp
{
instance = remove(deliveryTag);
}
- if(instance != null && instance.lockAcquisition(instance.getAcquiringConsumer()))
+ if(instance != null && instance.makeAcquisitionUnstealable(instance.getAcquiringConsumer()))
{
return Collections.singleton(instance);
}
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java Mon Sep 12 13:58:15 2016
@@ -50,8 +50,8 @@ public class UnacknowledgedMessageMapTes
map = new UnacknowledgedMessageMapImpl(100);
msgs = populateMap(map,expectedSize);
// simulate some messages being ttl expired
- when(msgs[2].lockAcquisition(_consumer)).thenReturn(Boolean.FALSE);
- when(msgs[4].lockAcquisition(_consumer)).thenReturn(Boolean.FALSE);
+ when(msgs[2].makeAcquisitionUnstealable(_consumer)).thenReturn(Boolean.FALSE);
+ when(msgs[4].makeAcquisitionUnstealable(_consumer)).thenReturn(Boolean.FALSE);
assertEquals(expectedSize,map.size());
@@ -80,7 +80,7 @@ public class UnacknowledgedMessageMapTes
private MessageInstance createMessageInstance(final int id)
{
MessageInstance instance = mock(MessageInstance.class);
- when(instance.lockAcquisition(_consumer)).thenReturn(Boolean.TRUE);
+ when(instance.makeAcquisitionUnstealable(_consumer)).thenReturn(Boolean.TRUE);
when(instance.getAcquiringConsumer()).thenReturn(_consumer);
return instance;
}
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Mon Sep 12 13:58:15 2016
@@ -388,7 +388,7 @@ class ConsumerTarget_1_0 extends Abstrac
if(outcome instanceof Accepted)
{
- if (_queueEntry.lockAcquisition(getConsumer()))
+ if (_queueEntry.makeAcquisitionUnstealable(getConsumer()))
{
txn.dequeue(_queueEntry.getEnqueueRecord(),
new ServerTransaction.Action()
Modified: qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Mon Sep 12 13:58:15 2016
@@ -1027,13 +1027,13 @@ class ManagementNode implements MessageS
}
@Override
- public void addStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+ public void addStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
{
}
@Override
- public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+ public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
{
return false;
}
@@ -1118,13 +1118,13 @@ class ManagementNode implements MessageS
}
@Override
- public boolean lockAcquisition(final ConsumerImpl consumer)
+ public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
{
return false;
}
@Override
- public boolean unlockAcquisition()
+ public boolean makeAcquisitionStealable()
{
return false;
}
Modified: qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java Mon Sep 12 13:58:15 2016
@@ -66,13 +66,13 @@ class ManagementResponse implements Mess
}
@Override
- public void addStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+ public void addStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
{
}
@Override
- public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, State> listener)
+ public boolean removeStateChangeListener(final StateChangeListener<? super MessageInstance, EntryState> listener)
{
return false;
}
@@ -157,13 +157,13 @@ class ManagementResponse implements Mess
}
@Override
- public boolean lockAcquisition(final ConsumerImpl consumer)
+ public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
{
return false;
}
@Override
- public boolean unlockAcquisition()
+ public boolean makeAcquisitionStealable()
{
return false;
}
Modified: qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java (original)
+++ qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java Mon Sep 12 13:58:15 2016
@@ -93,7 +93,7 @@ public class QpidBrokerTestCase extends
protected static final Logger _logger = LoggerFactory.getLogger(QpidBrokerTestCase.class);
protected static final int LOGMONITOR_TIMEOUT = 5000;
- protected long RECEIVE_TIMEOUT = Long.getLong("qpid.test_receive_timeout", 1000l);
+ protected static long RECEIVE_TIMEOUT = Long.getLong("qpid.test_receive_timeout", 1000l);
private Map<String, String> _propertiesSetForBroker = new HashMap<String, String>();
Modified: qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java (original)
+++ qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/client/FlowControlTest.java Mon Sep 12 13:58:15 2016
@@ -69,18 +69,9 @@ public class FlowControlTest extends Qpi
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(_queue);
- BytesMessage m1 = producerSession.createBytesMessage();
- m1.writeBytes(new byte[128]);
- m1.setIntProperty("msg", 1);
- producer.send(m1);
- BytesMessage m2 = producerSession.createBytesMessage();
- m2.writeBytes(new byte[128]);
- m2.setIntProperty("msg", 2);
- producer.send(m2);
- BytesMessage m3 = producerSession.createBytesMessage();
- m3.writeBytes(new byte[256]);
- m3.setIntProperty("msg", 3);
- producer.send(m3);
+ sendBytesMessage(producerSession, producer, 1, 128);
+ sendBytesMessage(producerSession, producer, 2, 128);
+ sendBytesMessage(producerSession, producer, 3, 256);
producer.close();
producerSession.close();
@@ -139,18 +130,9 @@ public class FlowControlTest extends Qpi
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(_queue);
- BytesMessage m1 = producerSession.createBytesMessage();
- m1.writeBytes(new byte[128]);
- m1.setIntProperty("msg", 1);
- producer.send(m1);
- BytesMessage m2 = producerSession.createBytesMessage();
- m2.writeBytes(new byte[256]);
- m2.setIntProperty("msg", 2);
- producer.send(m2);
- BytesMessage m3 = producerSession.createBytesMessage();
- m3.writeBytes(new byte[128]);
- m3.setIntProperty("msg", 3);
- producer.send(m3);
+ sendBytesMessage(producerSession, producer, 1, 128);
+ sendBytesMessage(producerSession, producer, 2, 256);
+ sendBytesMessage(producerSession, producer, 3, 128);
producer.close();
producerSession.close();
@@ -195,27 +177,47 @@ public class FlowControlTest extends Qpi
}
- public static void main(String args[]) throws Throwable
+ public void testDeliverMessageLargerThanBytesLimit() throws Exception
{
- FlowControlTest test = new FlowControlTest();
+ _queue = (Queue) getInitialContext().lookup("queue");
+ Connection connection = getConnection();
+ connection.start();
+
+ Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producerSession.createConsumer(_queue).close();
+ MessageProducer producer = producerSession.createProducer(_queue);
+
+ sendBytesMessage(producerSession, producer, 1, 128);
+ sendBytesMessage(producerSession, producer, 2, 256);
+
+ Session consumerSession1 = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ ((AMQSession_0_8) consumerSession1).setPrefetchLimits(0, 64);
+ MessageConsumer recv1 = consumerSession1.createConsumer(_queue);
+
+ Message r1 = recv1.receive(RECEIVE_TIMEOUT);
+ assertNotNull("First message not received", r1);
+ assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg"));
- int run = 0;
- while (true)
- {
- System.err.println("Test Run:" + ++run);
- Thread.sleep(1000);
- try
- {
- test.startBroker();
- test.testBasicBytesFlowControl();
-
- Thread.sleep(1000);
- }
- finally
- {
- test.stopBroker();
- }
- }
+ Message r2 = recv1.receive(RECEIVE_TIMEOUT);
+ assertNull("Second message incorrectly delivered", r2);
+
+ r1.acknowledge();
+
+ r2 = recv1.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Second message not received", r2);
+ assertEquals("Wrong messages received", 2, r2.getIntProperty("msg"));
+
+ r2.acknowledge();
+ }
+
+ private void sendBytesMessage(final Session producerSession,
+ final MessageProducer producer,
+ final int messageId, final int messageSize) throws Exception
+ {
+ BytesMessage message = producerSession.createBytesMessage();
+ message.writeBytes(new byte[messageSize]);
+ message.setIntProperty("msg", messageId);
+ producer.send(message);
}
}
Modified: qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=1760370&r1=1760369&r2=1760370&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Mon Sep 12 13:58:15 2016
@@ -20,14 +20,15 @@
*/
package org.apache.qpid.test.unit.transacted;
-import org.apache.qpid.client.RejectBehaviour;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -37,8 +38,19 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.RejectBehaviour;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
/**
* This class tests a number of commits and roll back scenarios
@@ -50,8 +62,8 @@ import java.util.concurrent.TimeUnit;
public class CommitRollbackTest extends QpidBrokerTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(CommitRollbackTest.class);
- private static final int POSITIVE_TIMEOUT = 2000;
- private static final int NEGATIVE_TIMEOUT = 250;
+ private static final long POSITIVE_TIMEOUT = QpidBrokerTestCase.RECEIVE_TIMEOUT;
+ private static final long NEGATIVE_TIMEOUT = QpidBrokerTestCase.RECEIVE_TIMEOUT;
protected AMQConnection _conn;
private Session _session;
@@ -472,7 +484,7 @@ public class CommitRollbackTest extends
_logger.info("Receiving messages");
- Message result = _consumer.receive(POSITIVE_TIMEOUT);;
+ Message result = _consumer.receive(POSITIVE_TIMEOUT);
assertNotNull("Message expected", result);
// Expect the first message
assertEquals("Unexpected message received", 0, result.getIntProperty(INDEX));
@@ -502,6 +514,106 @@ public class CommitRollbackTest extends
}
}
+ public void testRollbackSoak() throws Exception
+ {
+ newConnection();
+ final int rollbackTime = 2000;
+ final int numberOfMessages = 1000;
+ final int numberOfConsumers = 2;
+ final long testTimeout = numberOfMessages * POSITIVE_TIMEOUT / numberOfConsumers;
+ sendMessage(_pubSession, _jmsQueue, numberOfMessages);
+
+ List<ListenableFuture<Void >> consumerFutures = new ArrayList<>(numberOfConsumers);
+ final ListeningExecutorService threadPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfConsumers));
+
+ try
+ {
+ final CountDownLatch modeFlippedLatch = new CountDownLatch(1);
+ final AtomicInteger counter = new AtomicInteger();
+ final AtomicInteger rollbackCounter = new AtomicInteger();
+ final long flipModeTime = System.currentTimeMillis() + rollbackTime;
+ final AtomicBoolean shutdown = new AtomicBoolean();
+
+ for (int i = 0; i < numberOfConsumers; ++i)
+ {
+ consumerFutures.add(threadPool.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ Session session = _conn.createSession(true, Session.SESSION_TRANSACTED);
+ final MessageConsumer consumer = session.createConsumer(_jmsQueue);
+
+ while(!shutdown.get())
+ {
+ Message m = consumer.receive(POSITIVE_TIMEOUT);
+ if (m != null)
+ {
+ long currentTime = System.currentTimeMillis();
+ if (currentTime < flipModeTime)
+ {
+ session.rollback();
+ rollbackCounter.incrementAndGet();
+ }
+ else
+ {
+ modeFlippedLatch.countDown();
+ counter.incrementAndGet();
+ session.commit();
+ }
+ }
+
+ if (counter.get() == numberOfMessages)
+ {
+ break;
+ }
+
+ if (Thread.currentThread().isInterrupted())
+ {
+ break;
+ }
+ }
+
+ return null;
+ }
+ }));
+ }
+
+ final ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(consumerFutures);
+ modeFlippedLatch.await(rollbackTime * 2, TimeUnit.MILLISECONDS);
+ try
+ {
+ combinedFuture.get(testTimeout, TimeUnit.MILLISECONDS);
+ _logger.debug("Performed {} rollbacks, consumed {}/{} messages",
+ rollbackCounter.get(),
+ counter.get(),
+ numberOfMessages);
+ }
+ catch (TimeoutException e)
+ {
+ fail(String.format(
+ "Test took more than %.1f seconds. All consumers probably starved. Performed %d rollbacks, consumed %d/%d messages",
+ testTimeout / 1000.,
+ rollbackCounter.get(),
+ counter.get(),
+ numberOfMessages));
+ }
+ finally
+ {
+ shutdown.set(true);
+ }
+ assertEquals(String.format(
+ "Unexpected number of messages received. Performed %d rollbacks, consumed %d/%d messages",
+ rollbackCounter.get(),
+ counter.get(),
+ numberOfMessages), numberOfMessages, counter.get());
+ }
+ finally
+ {
+ threadPool.shutdownNow();
+ threadPool.awaitTermination(2 * POSITIVE_TIMEOUT, TimeUnit.SECONDS);
+ }
+ }
public void testResendUnseenMessagesAfterRollback() throws Exception
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org