You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/04/11 14:27:07 UTC
svn commit: r1738576 - in /qpid/java/branches/6.0.x: ./
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-core/src/main/ja...
Author: kwall
Date: Mon Apr 11 12:27:07 2016
New Revision: 1738576
URL: http://svn.apache.org/viewvc?rev=1738576&view=rev
Log:
QPID-7154: [Java Broker] Ensure that dead letter paths always locks the queue entry acquisition
Merged with command:
svn merge -c 1737804,1737984,1738119 ^/qpid/java/trunk
Added:
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java
- copied, changed from r1737804, qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java
Modified:
qpid/java/branches/6.0.x/ (props changed)
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
qpid/java/branches/6.0.x/test-profiles/CPPExcludes (contents, props changed)
Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Apr 11 12:27:07 2016
@@ -9,5 +9,5 @@
/qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732525,1732812,1734452,1736478,1736751,1736838,1737835,1737992,1738231
+/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732525,1732812,1734452,1736478,1736751,1736838,1737804,1737835,1737984,1737992,1738119,1738231
/qpid/trunk/qpid:796646-796653
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Mon Apr 11 12:27:07 2016
@@ -251,10 +251,7 @@ public abstract class AbstractConsumerTa
while((instance = _queue.poll()) != null)
{
MessageInstance entry = instance.getEntry();
- if(entry.isAcquiredBy(instance.getConsumer()))
- {
- entry.release();
- }
+ entry.release(instance.getConsumer());
instance.release();
}
doCloseInternal();
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Mon Apr 11 12:27:07 2016
@@ -70,7 +70,7 @@ public interface MessageInstance
boolean acquire(ConsumerImpl sub);
- boolean lockAcquisition();
+ boolean lockAcquisition(final ConsumerImpl consumer);
boolean unlockAcquisition();
@@ -250,6 +250,8 @@ public interface MessageInstance
void release();
+ void release(ConsumerImpl release);
+
boolean resend();
void delete();
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java Mon Apr 11 12:27:07 2016
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.Atomi
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -217,6 +218,14 @@ public class LastValueQueueList extends
discardIfReleasedEntryIsNoLongerLatest();
}
+
+ @Override
+ public void release(ConsumerImpl consumer)
+ {
+ super.release(consumer);
+
+ discardIfReleasedEntryIsNoLongerLatest();
+ }
@Override
protected void onDelete()
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Apr 11 12:27:07 2016
@@ -208,7 +208,6 @@ public abstract class QueueEntryImpl imp
{
return acquire(NON_CONSUMER_ACQUIRED_STATE);
}
-
private class DelayedAcquisitionStateListener implements StateChangeListener<MessageInstance, State>
{
private final Runnable _task;
@@ -291,10 +290,10 @@ public abstract class QueueEntryImpl imp
}
@Override
- public boolean lockAcquisition()
+ public boolean lockAcquisition(final ConsumerImpl consumer)
{
EntryState state = _state;
- if(state instanceof ConsumerAcquiredState)
+ if(state instanceof ConsumerAcquiredState && ((ConsumerAcquiredState) state).getConsumer() == consumer)
{
LockedAcquiredState lockedState = ((ConsumerAcquiredState) state).getLockedState();
boolean updated = _stateUpdater.compareAndSet(this, state, lockedState);
@@ -304,7 +303,7 @@ public abstract class QueueEntryImpl imp
}
return updated;
}
- return state instanceof LockedAcquiredState;
+ return state instanceof LockedAcquiredState && ((LockedAcquiredState) state).getConsumer() == consumer;
}
@Override
@@ -375,36 +374,49 @@ public abstract class QueueEntryImpl imp
}
}
+ @Override
public void release()
{
EntryState state = _state;
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
{
+ postRelease(state);
+ }
+ }
- if(state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
- {
- getQueue().decrementUnackedMsgCount(this);
- }
+ @Override
+ public void release(ConsumerImpl consumer)
+ {
+ EntryState state = _state;
+ if(isAcquiredBy(consumer) && _stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
+ {
+ postRelease(state);
+ }
+ }
- if(!getQueue().isDeleted())
- {
- getQueue().requeue(this);
- if(_stateChangeListeners != null)
- {
- notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
- }
+ private void postRelease(final EntryState previousState)
+ {
+ if(previousState instanceof ConsumerAcquiredState || previousState instanceof LockedAcquiredState)
+ {
+ getQueue().decrementUnackedMsgCount(this);
+ }
- }
- else if(acquire())
+ if(!getQueue().isDeleted())
+ {
+ getQueue().requeue(this);
+ if(_stateChangeListeners != null)
{
- routeToAlternate(null, null);
+ notifyStateChange(State.ACQUIRED, State.AVAILABLE);
}
- }
+ }
+ else if(acquire())
+ {
+ routeToAlternate(null, null);
+ }
}
-
@Override
public QueueConsumer getDeliveredConsumer()
{
@@ -527,6 +539,10 @@ public abstract class QueueEntryImpl imp
public int routeToAlternate(final Action<? super MessageInstance> action, ServerTransaction txn)
{
+ if (!isAcquired())
+ {
+ throw new IllegalStateException("Illegal queue entry state. " + this + " is not acquired.");
+ }
final AMQQueue currentQueue = getQueue();
Exchange<?> alternateExchange = currentQueue.getAlternateExchange();
boolean autocommit = txn == null;
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Mon Apr 11 12:27:07 2016
@@ -448,7 +448,7 @@ public abstract class AbstractSystemMess
}
@Override
- public boolean lockAcquisition()
+ public boolean lockAcquisition(final ConsumerImpl consumer)
{
return false;
}
@@ -504,6 +504,12 @@ public abstract class AbstractSystemMess
}
@Override
+ public void release(ConsumerImpl consumer)
+ {
+ release();
+ }
+
+ @Override
public boolean resend()
{
return false;
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java Mon Apr 11 12:27:07 2016
@@ -58,6 +58,7 @@ import org.apache.qpid.server.txn.DtxBra
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.Xid;
import org.apache.qpid.transport.util.Functions;
@@ -356,23 +357,33 @@ public class AsynchronousMessageStoreRec
{
final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
- entry.acquire();
-
- branch.dequeue(entry.getEnqueueRecord());
-
- branch.addPostTransactionAction(new ServerTransaction.Action()
+ if (entry.acquire())
{
+ branch.dequeue(entry.getEnqueueRecord());
- public void postCommit()
+ branch.addPostTransactionAction(new ServerTransaction.Action()
{
- entry.delete();
- }
- public void onRollback()
- {
- entry.release();
- }
- });
+ public void postCommit()
+ {
+ entry.delete();
+ }
+
+ public void onRollback()
+ {
+ entry.release();
+ }
+ });
+ }
+ else
+ {
+ // Should never happen - dtx recovery is always synchronous and occurs before
+ // any other message actors are allowed to act on the virtualhost.
+ throw new ServerScopedRuntimeException(
+ "Distributed transaction dequeue handler failed to acquire " + entry +
+ " during recovery of queue " + queue);
+
+ }
}
else
{
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java Mon Apr 11 12:27:07 2016
@@ -53,6 +53,7 @@ import org.apache.qpid.server.txn.DtxBra
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.Xid;
import org.apache.qpid.transport.util.Functions;
@@ -331,23 +332,33 @@ public class SynchronousMessageStoreReco
{
final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
- entry.acquire();
-
- branch.dequeue(entry.getEnqueueRecord());
-
- branch.addPostTransactionAction(new ServerTransaction.Action()
+ if (entry.acquire())
{
+ branch.dequeue(entry.getEnqueueRecord());
- public void postCommit()
+ branch.addPostTransactionAction(new ServerTransaction.Action()
{
- entry.delete();
- }
- public void onRollback()
- {
- entry.release();
- }
- });
+ public void postCommit()
+ {
+ entry.delete();
+ }
+
+ public void onRollback()
+ {
+ entry.release();
+ }
+ });
+ }
+ else
+ {
+ // Should never happen - dtx recovery is always synchronous and occurs before
+ // any other message actors are allowed to act on the virtualhost.
+ throw new ServerScopedRuntimeException(
+ "Distributed transaction dequeue handler failed to acquire " + entry +
+ " during recovery of queue " + queue);
+ }
+
}
else
{
Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Mon Apr 11 12:27:07 2016
@@ -101,7 +101,7 @@ public class MockMessageInstance impleme
}
@Override
- public boolean lockAcquisition()
+ public boolean lockAcquisition(final ConsumerImpl consumer)
{
return false;
}
@@ -157,6 +157,11 @@ public class MockMessageInstance impleme
{
}
+ @Override
+ public void release(final ConsumerImpl release)
+ {
+ }
+
@Override
public boolean resend()
{
Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Mon Apr 11 12:27:07 2016
@@ -170,13 +170,33 @@ public abstract class QueueEntryImplTest
assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition());
- assertTrue("Should be able to unlock locked queue entry",_queueEntry.lockAcquisition());
+ assertTrue("Should be able to lock queue entry",_queueEntry.lockAcquisition(consumer));
assertFalse("Acquisition should not be able to be hijacked when locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
_queueEntry.delete();
assertTrue("Locked queue entry should be able to be deleted", _queueEntry.isDeleted());
}
+ public void testLockAcquisitionOwnership()
+ {
+ QueueConsumer consumer1 = newConsumer();
+ QueueConsumer consumer2 = newConsumer();
+
+ _queueEntry.acquire(consumer1);
+ assertTrue("Queue entry should be acquired by consumer1", _queueEntry.acquiredByConsumer());
+
+ assertTrue("Consumer1 relocking should be allowed", _queueEntry.lockAcquisition(consumer1));
+ assertFalse("Consumer2 should not be allowed", _queueEntry.lockAcquisition(consumer2));
+
+ _queueEntry.unlockAcquisition();
+
+ assertTrue("Queue entry should still be acquired by consumer1", _queueEntry.acquiredByConsumer());
+
+ _queueEntry.release(consumer1);
+
+ assertFalse("Queue entry should no longer be acquired by consumer1", _queueEntry.acquiredByConsumer());
+ }
+
/**
* A helper method to get entry state
*
Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Mon Apr 11 12:27:07 2016
@@ -341,7 +341,7 @@ public class StandardQueueTest extends A
}
@Override
- public boolean lockAcquisition()
+ public boolean lockAcquisition(final ConsumerImpl consumer)
{
return true;
}
Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java Mon Apr 11 12:27:07 2016
@@ -311,8 +311,10 @@ public class SynchronousMessageStoreReco
Transaction.DequeueRecord dequeueRecord = createMockDequeueRecord(queueId, messageId);
QueueEntry queueEntry = mock(QueueEntry.class);
+ when(queueEntry.acquire()).thenReturn(true);
when(queue.getMessageOnTheQueue(messageId)).thenReturn(queueEntry);
+
final long format = 1;
final byte[] globalId = new byte[] {0};
final byte[] branchId = new byte[] {0};
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Mon Apr 11 12:27:07 2016
@@ -402,13 +402,12 @@ public class ConsumerTarget_0_10 extends
});
}
- void reject(final MessageInstance entry)
+ void reject(final ConsumerImpl consumer, final MessageInstance entry)
{
entry.setRedelivered();
- entry.routeToAlternate(null, null);
- if(isAcquiredByConsumer(entry))
+ if (entry.lockAcquisition(consumer))
{
- entry.delete();
+ entry.routeToAlternate(null, null);
}
}
@@ -423,7 +422,9 @@ public class ConsumerTarget_0_10 extends
return false;
}
- void release(final MessageInstance entry, final boolean setRedelivered)
+ void release(final ConsumerImpl consumer,
+ final MessageInstance entry,
+ final boolean setRedelivered)
{
if (setRedelivered)
{
@@ -437,29 +438,32 @@ public class ConsumerTarget_0_10 extends
if (isMaxDeliveryLimitReached(entry))
{
- sendToDLQOrDiscard(entry);
+ sendToDLQOrDiscard(consumer, entry);
}
else
{
- entry.release();
+ entry.release(consumer);
}
}
- protected void sendToDLQOrDiscard(MessageInstance entry)
+ protected void sendToDLQOrDiscard(final ConsumerImpl consumer, MessageInstance entry)
{
final ServerMessage msg = entry.getMessage();
- int requeues = entry.routeToAlternate(new Action<MessageInstance>()
- {
- @Override
- public void performAction(final MessageInstance requeueEntry)
- {
- getEventLogger().message(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
- requeueEntry.getOwningResource()
- .getName()));
- }
- }, null);
-
+ int requeues = 0;
+ if (entry.lockAcquisition(consumer))
+ {
+ requeues = entry.routeToAlternate(new Action<MessageInstance>()
+ {
+ @Override
+ public void performAction(final MessageInstance requeueEntry)
+ {
+ getEventLogger().message(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+ requeueEntry.getOwningResource()
+ .getName()));
+ }
+ }, null);
+ }
if (requeues == 0)
{
TransactionLogResource owningResource = entry.getOwningResource();
@@ -586,20 +590,6 @@ public class ConsumerTarget_0_10 extends
return _stopped.get();
}
- public boolean deleteAcquired(MessageInstance entry)
- {
- if(isAcquiredByConsumer(entry))
- {
- acquisitionRemoved(entry);
- entry.delete();
- return true;
- }
- else
- {
- return false;
- }
- }
-
@Override
public void acquisitionRemoved(final MessageInstance entry)
{
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Mon Apr 11 12:27:07 2016
@@ -47,40 +47,17 @@ class ExplicitAcceptDispositionChangeLis
public void onAccept()
{
- if(_target != null && _entry.isAcquiredBy(_consumer) && _entry.lockAcquisition())
- {
- _target.getSessionModel().acknowledge(_target, _entry);
- }
- else
- {
- _logger.debug("MessageAccept received for message which is not been acquired - message may have expired or been removed");
- }
-
+ _target.getSessionModel().acknowledge(_consumer, _target, _entry);
}
public void onRelease(boolean setRedelivered)
{
- if(_target != null && _entry.isAcquiredBy(_consumer))
- {
- _target.release(_entry, setRedelivered);
- }
- else
- {
- _logger.debug("MessageRelease received for message which has not been acquired - message may have expired or been removed");
- }
+ _target.release(_consumer, _entry, setRedelivered);
}
public void onReject()
{
- if(_target != null && _entry.isAcquiredBy(_consumer))
- {
- _target.reject(_entry);
- }
- else
- {
- _logger.debug("MessageReject received for message which has not been acquired - message may have expired or been removed");
- }
-
+ _target.reject(_consumer, _entry);
}
public boolean acquire()
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java Mon Apr 11 12:27:07 2016
@@ -51,27 +51,13 @@ class ImplicitAcceptDispositionChangeLis
public void onRelease(boolean setRedelivered)
{
- if(_entry.isAcquiredBy(_consumer))
- {
- _target.release(_entry, setRedelivered);
- }
- else
- {
- _logger.warn("MessageRelease received for message which has not been acquired (likely client error)");
- }
+ _target.release(_consumer, _entry, setRedelivered);
+
}
public void onReject()
{
- if(_entry.isAcquiredBy(_consumer))
- {
- _target.reject(_entry);
- }
- else
- {
- _logger.warn("MessageReject received for message which has not been acquired (likely client error)");
- }
-
+ _target.reject(_consumer, _entry);
}
public boolean acquire()
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Mon Apr 11 12:27:07 2016
@@ -58,10 +58,7 @@ public class MessageAcceptCompletionList
{
_sub.getCreditManager().restoreCredit(1l, _messageSize);
}
- if(_entry.isAcquiredBy(_consumer) && _entry.lockAcquisition())
- {
- _session.acknowledge(_sub, _entry);
- }
+ _session.acknowledge(_consumer, _sub, _entry);
_session.removeDispositionListener(method);
}
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Mon Apr 11 12:27:07 2016
@@ -75,7 +75,6 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.security.*;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.AMQPConnection;
@@ -524,25 +523,31 @@ public class ServerSession extends Sessi
// Broker shouldn't block awaiting close - thus do override this method to do nothing
}
- public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry)
+ public void acknowledge(final ConsumerImpl consumer,
+ final ConsumerTarget_0_10 target,
+ final MessageInstance entry)
{
- _transaction.dequeue(entry.getEnqueueRecord(),
- new ServerTransaction.Action()
- {
-
- public void postCommit()
+ if (entry.lockAcquisition(consumer))
+ {
+ _transaction.dequeue(entry.getEnqueueRecord(),
+ new ServerTransaction.Action()
{
- sub.deleteAcquired(entry);
- }
- public void onRollback()
- {
- // The client has acknowledge the message and therefore have seen it.
- // In the event of rollback, the message must be marked as redelivered.
- entry.setRedelivered();
- entry.release();
- }
- });
+ public void postCommit()
+ {
+ target.acquisitionRemoved(entry);
+ entry.delete();
+ }
+
+ public void onRollback()
+ {
+ // The client has acknowledge the message and therefore have seen it.
+ // In the event of rollback, the message must be marked as redelivered.
+ entry.setRedelivered();
+ entry.release(consumer);
+ }
+ });
+ }
}
Collection<ConsumerTarget_0_10> getSubscriptions()
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Apr 11 12:27:07 2016
@@ -953,7 +953,7 @@ public class AMQChannel
* this same channel or to other subscribers.
*
*/
- public void requeue()
+ private void requeue()
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Collection<MessageInstance> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
@@ -973,7 +973,7 @@ public class AMQChannel
unacked.setRedelivered();
// Ensure message is released for redelivery
- unacked.release();
+ unacked.release(unacked.getAcquiringConsumer());
}
}
@@ -994,8 +994,7 @@ public class AMQChannel
unacked.setRedelivered();
// Ensure message is released for redelivery
- unacked.release();
-
+ unacked.release(unacked.getAcquiringConsumer());
}
else
{
@@ -1035,7 +1034,7 @@ public class AMQChannel
* Called to resend all outstanding unacknowledged messages to this same channel.
*
*/
- public void resend()
+ private void resend()
{
@@ -1108,7 +1107,7 @@ public class AMQChannel
_unacknowledgedMessageMap.remove(deliveryTag);
message.setRedelivered();
- message.release();
+ message.release(message.getAcquiringConsumer());
}
}
@@ -1122,7 +1121,7 @@ public class AMQChannel
* acknowledges the single message specified by the delivery tag
*
*/
- public void acknowledgeMessage(long deliveryTag, boolean multiple)
+ private void acknowledgeMessage(long deliveryTag, boolean multiple)
{
Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple);
_transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
@@ -1255,7 +1254,7 @@ public class AMQChannel
_uncommittedMessages.clear();
}
- public void rollback(Runnable postRollbackTask)
+ private void rollback(Runnable postRollbackTask)
{
// stop all subscriptions
@@ -1284,10 +1283,10 @@ public class AMQChannel
for(MessageInstance entry : _resendList)
{
- ConsumerImpl sub = entry.getDeliveredConsumer();
- if(sub == null || sub.isClosed())
+ ConsumerImpl sub = entry.getAcquiringConsumer();
+ if (sub == null || sub.isClosed())
{
- entry.release();
+ entry.release(sub);
}
else
{
@@ -1627,7 +1626,7 @@ public class AMQChannel
{
for(MessageInstance entry : _ackedMessages)
{
- entry.release();
+ entry.release(entry.getAcquiringConsumer());
}
}
finally
@@ -1768,7 +1767,7 @@ public class AMQChannel
_transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose);
}
- public void deadLetter(long deliveryTag)
+ private void deadLetter(long deliveryTag)
{
final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag);
@@ -1780,9 +1779,10 @@ public class AMQChannel
else
{
final ServerMessage msg = rejectedQueueEntry.getMessage();
-
-
- int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
+ int requeues = 0;
+ if (rejectedQueueEntry.lockAcquisition(rejectedQueueEntry.getAcquiringConsumer()))
+ {
+ requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
{
@Override
public void performAction(final MessageInstance requeueEntry)
@@ -1793,6 +1793,7 @@ public class AMQChannel
.getName()));
}
}, null);
+ }
if(requeues == 0)
{
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java Mon Apr 11 12:27:07 2016
@@ -62,10 +62,6 @@ public class ExtractResendAndRequeue imp
_msgToRequeue.put(deliveryTag, message);
}
}
- else
- {
- _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
- }
// false means continue processing
return false;
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Mon Apr 11 12:27:07 2016
@@ -21,7 +21,6 @@
package org.apache.qpid.server.protocol.v0_8;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -156,7 +155,7 @@ public class UnacknowledgedMessageMapImp
List<MessageInstance> acknowledged = new ArrayList<>();
for (MessageInstance instance : ackedMessageMap.values())
{
- if (instance.lockAcquisition())
+ if (instance.lockAcquisition(instance.getAcquiringConsumer()))
{
acknowledged.add(instance);
}
@@ -170,7 +169,7 @@ public class UnacknowledgedMessageMapImp
{
instance = remove(deliveryTag);
}
- if(instance != null && instance.lockAcquisition())
+ if(instance != null && instance.lockAcquisition(instance.getAcquiringConsumer()))
{
return Collections.singleton(instance);
}
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java Mon Apr 11 12:27:07 2016
@@ -25,11 +25,14 @@ import static org.mockito.Mockito.when;
import java.util.Collection;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.test.utils.QpidTestCase;
public class UnacknowledgedMessageMapTest extends QpidTestCase
{
+ private final ConsumerImpl _consumer = mock(ConsumerImpl.class);
+
public void testDeletedMessagesCantBeAcknowledged()
{
UnacknowledgedMessageMap map = new UnacknowledgedMessageMapImpl(100);
@@ -47,8 +50,8 @@ public class UnacknowledgedMessageMapTes
map = new UnacknowledgedMessageMapImpl(100);
msgs = populateMap(map,expectedSize);
// simulate some messages being ttl expired
- when(msgs[2].lockAcquisition()).thenReturn(Boolean.FALSE);
- when(msgs[4].lockAcquisition()).thenReturn(Boolean.FALSE);
+ when(msgs[2].lockAcquisition(_consumer)).thenReturn(Boolean.FALSE);
+ when(msgs[4].lockAcquisition(_consumer)).thenReturn(Boolean.FALSE);
assertEquals(expectedSize,map.size());
@@ -77,7 +80,8 @@ public class UnacknowledgedMessageMapTes
private MessageInstance createMessageInstance(final int id)
{
MessageInstance instance = mock(MessageInstance.class);
- when(instance.lockAcquisition()).thenReturn(Boolean.TRUE);
+ when(instance.lockAcquisition(_consumer)).thenReturn(Boolean.TRUE);
+ when(instance.getAcquiringConsumer()).thenReturn(_consumer);
return instance;
}
}
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Mon Apr 11 12:27:07 2016
@@ -240,13 +240,8 @@ class ConsumerTarget_1_0 extends Abstrac
public void onRollback()
{
- if(entry.isAcquiredBy(getConsumer()))
- {
- entry.release();
- _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
-
-
- }
+ entry.release(getConsumer());
+ _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
}
});
}
@@ -257,7 +252,7 @@ class ConsumerTarget_1_0 extends Abstrac
}
else
{
- entry.release();
+ entry.release(getConsumer());
}
}
}
@@ -393,24 +388,26 @@ class ConsumerTarget_1_0 extends Abstrac
if(outcome instanceof Accepted)
{
- _queueEntry.lockAcquisition();
- txn.dequeue(_queueEntry.getEnqueueRecord(),
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- if(_queueEntry.isAcquiredBy(getConsumer()))
+ if (_queueEntry.lockAcquisition(getConsumer()))
+ {
+ txn.dequeue(_queueEntry.getEnqueueRecord(),
+ new ServerTransaction.Action()
{
- _queueEntry.delete();
- }
- }
- public void onRollback()
- {
+ public void postCommit()
+ {
+ if (_queueEntry.isAcquiredBy(getConsumer()))
+ {
+ _queueEntry.delete();
+ }
+ }
- }
- });
+ public void onRollback()
+ {
+
+ }
+ });
+ }
txn.addPostTransactionAction(new ServerTransaction.Action()
{
public void postCommit()
@@ -429,7 +426,7 @@ class ConsumerTarget_1_0 extends Abstrac
_link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
_link.getEndpoint().sendFlowConditional();
_queueEntry.incrementDeliveryCount();
- _queueEntry.release();
+ _queueEntry.release(getConsumer());
}
}
});
@@ -441,7 +438,7 @@ class ConsumerTarget_1_0 extends Abstrac
public void postCommit()
{
- _queueEntry.release();
+ _queueEntry.release(getConsumer());
_link.getEndpoint().settle(_deliveryTag);
}
@@ -459,7 +456,7 @@ class ConsumerTarget_1_0 extends Abstrac
public void postCommit()
{
- _queueEntry.release();
+ _queueEntry.release(getConsumer());
if(Boolean.TRUE.equals(((Modified)outcome).getDeliveryFailed()))
{
_queueEntry.incrementDeliveryCount();
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Mon Apr 11 12:27:07 2016
@@ -621,7 +621,7 @@ public class SendingLink_1_0 implements
if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
{
queueEntry.setRedelivered();
- queueEntry.release();
+ queueEntry.release(_consumer);
_unsettledMap.remove(deliveryTag);
}
else if(initialUnsettledMap.get(deliveryTag) instanceof Outcome)
@@ -660,12 +660,11 @@ public class SendingLink_1_0 implements
{
public void postCommit()
{
- queueEntry.release();
+ queueEntry.release(_consumer);
}
public void onRollback()
{
- //To change body of implemented methods use File | Settings | File Templates.
}
});
}
Modified: qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Mon Apr 11 12:27:07 2016
@@ -1118,7 +1118,7 @@ class ManagementNode implements MessageS
}
@Override
- public boolean lockAcquisition()
+ public boolean lockAcquisition(final ConsumerImpl consumer)
{
return false;
}
@@ -1172,6 +1172,12 @@ class ManagementNode implements MessageS
{
}
+
+ @Override
+ public void release(final ConsumerImpl release)
+ {
+
+ }
@Override
public boolean resend()
Modified: qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java Mon Apr 11 12:27:07 2016
@@ -157,7 +157,7 @@ class ManagementResponse implements Mess
}
@Override
- public boolean lockAcquisition()
+ public boolean lockAcquisition(final ConsumerImpl consumer)
{
return false;
}
@@ -213,6 +213,12 @@ class ManagementResponse implements Mess
}
@Override
+ public void release(final ConsumerImpl release)
+ {
+ release();
+ }
+
+ @Override
public boolean resend()
{
return false;
Copied: qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java (from r1737804, qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java)
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java?p2=qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java&p1=qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java&r1=1737804&r2=1738576&rev=1738576&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java (original)
+++ qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java Mon Apr 11 12:27:07 2016
@@ -35,6 +35,10 @@ import javax.jms.Session;
import org.apache.qpid.client.RejectBehaviour;
import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.server.management.plugin.HttpManagement;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Plugin;
+import org.apache.qpid.server.model.Port;
import org.apache.qpid.systest.rest.RestTestHelper;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
@@ -47,7 +51,21 @@ public class LiveQueueOperationsTest ext
@Override
protected void setUp() throws Exception
{
- getDefaultBrokerConfiguration().addHttpManagementConfiguration();
+ TestBrokerConfiguration config = getBrokerConfiguration();
+ config.addHttpManagementConfiguration();
+ config.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.PORT, getHttpManagementPort(getPort()));
+ config.removeObjectConfiguration(Port.class, TestBrokerConfiguration.ENTRY_NAME_JMX_PORT);
+ config.removeObjectConfiguration(Port.class, TestBrokerConfiguration.ENTRY_NAME_RMI_PORT);
+
+ config.setObjectAttribute(AuthenticationProvider.class, TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER,
+ "secureOnlyMechanisms",
+ "{}");
+
+ // set password authentication provider on http port for the tests
+ config.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.AUTHENTICATION_PROVIDER,
+ TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER);
+ config.setObjectAttribute(Plugin.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_MANAGEMENT, HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, true);
+
setTestSystemProperty("queue.deadLetterQueueEnabled","true");
setTestSystemProperty("queue.maximumDeliveryAttempts", String.valueOf(MAX_DELIVERY_COUNT));
@@ -57,14 +75,15 @@ public class LiveQueueOperationsTest ext
{
setTestClientSystemProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.SERVER.toString());
}
-
super.setUp();
+
}
public void testClearQueueOperationWithActiveConsumerDlqAll() throws Exception
{
final String virtualHostName = TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST;
- RestTestHelper restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
+ RestTestHelper restTestHelper = new RestTestHelper(getHttpManagementPort(getPort()));
+ restTestHelper.setUsernameAndPassword("webadmin", "webadmin");
Connection conn = getConnection();
conn.start();
Modified: qpid/java/branches/6.0.x/test-profiles/CPPExcludes
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/test-profiles/CPPExcludes?rev=1738576&r1=1738575&r2=1738576&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/test-profiles/CPPExcludes (original)
+++ qpid/java/branches/6.0.x/test-profiles/CPPExcludes Mon Apr 11 12:27:07 2016
@@ -109,6 +109,7 @@ org.apache.qpid.systest.management.jmx.*
// JMX is used in this test for validation
org.apache.qpid.server.queue.ModelTest#*
+org.apache.qpid.server.queue.LiveQueueOperationsTest#*
// 0-10 is not supported by the MethodRegistry
org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*
Propchange: qpid/java/branches/6.0.x/test-profiles/CPPExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Apr 11 12:27:07 2016
@@ -6,4 +6,4 @@
/qpid/branches/java-broker-vhost-refactor/java/test-profiles/CPPExcludes:1493674-1494547
/qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/CPPExcludes:1061302-1072333
-/qpid/java/trunk/test-profiles/CPPExcludes:1715446,1732812,1736751,1736838
+/qpid/java/trunk/test-profiles/CPPExcludes:1715446,1732812,1736751,1736838,1737804
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org