You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/04/05 10:58:56 UTC
svn commit: r1737804 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-core/src/main/java/org/apach...
Author: kwall
Date: Tue Apr 5 08:58:56 2016
New Revision: 1737804
URL: http://svn.apache.org/viewvc?rev=1737804&view=rev
Log:
QPID-7154: [Java Broker] Ensure that dead letter paths always lock the queue entry acquisition
Also ensure that consumer path queue entry releases release only consumer acquisitions
Added:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
qpid/java/trunk/test-profiles/CPPExcludes
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Tue Apr 5 08:58:56 2016
@@ -251,10 +251,7 @@ public abstract class AbstractConsumerTa
while((instance = _queue.poll()) != null)
{
MessageInstance entry = instance.getEntry();
- if(entry.isAcquiredBy(instance.getConsumer()))
- {
- entry.release();
- }
+ entry.release(instance.getConsumer());
instance.release();
}
doCloseInternal();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Tue Apr 5 08:58:56 2016
@@ -250,6 +250,8 @@ public interface MessageInstance
void release();
+ void release(ConsumerImpl release);
+
boolean resend();
void delete();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java Tue Apr 5 08:58:56 2016
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.Atomi
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -217,6 +218,14 @@ public class LastValueQueueList extends
discardIfReleasedEntryIsNoLongerLatest();
}
+
+ @Override
+ public void release(ConsumerImpl consumer)
+ {
+ super.release(consumer);
+
+ discardIfReleasedEntryIsNoLongerLatest();
+ }
@Override
protected void onDelete()
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Apr 5 08:58:56 2016
@@ -209,7 +209,6 @@ public abstract class QueueEntryImpl imp
{
return acquire(NON_CONSUMER_ACQUIRED_STATE);
}
-
private class DelayedAcquisitionStateListener implements StateChangeListener<MessageInstance, State>
{
private final Runnable _task;
@@ -376,36 +375,49 @@ public abstract class QueueEntryImpl imp
}
}
+ @Override
public void release()
{
EntryState state = _state;
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
{
+ postRelease(state);
+ }
+ }
- if(state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
- {
- getQueue().decrementUnackedMsgCount(this);
- }
+ @Override
+ public void release(ConsumerImpl consumer)
+ {
+ EntryState state = _state;
+ if(isAcquiredBy(consumer) && _stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
+ {
+ postRelease(state);
+ }
+ }
- if(!getQueue().isDeleted())
- {
- getQueue().requeue(this);
- if(_stateChangeListeners != null)
- {
- notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
- }
+ private void postRelease(final EntryState previousState)
+ {
+ if(previousState instanceof ConsumerAcquiredState || previousState instanceof LockedAcquiredState)
+ {
+ getQueue().decrementUnackedMsgCount(this);
+ }
- }
- else if(acquire())
+ if(!getQueue().isDeleted())
+ {
+ getQueue().requeue(this);
+ if(_stateChangeListeners != null)
{
- routeToAlternate(null, null);
+ notifyStateChange(State.ACQUIRED, State.AVAILABLE);
}
- }
+ }
+ else if(acquire())
+ {
+ routeToAlternate(null, null);
+ }
}
-
@Override
public QueueConsumer getDeliveredConsumer()
{
@@ -528,6 +540,11 @@ public abstract class QueueEntryImpl imp
public int routeToAlternate(final Action<? super MessageInstance> action, ServerTransaction txn)
{
+ if (!isAcquired())
+ {
+ throw new IllegalStateException("Illegal queue entry state. " + this + " is not acquired.");
+ }
+
final Queue<?> currentQueue = getQueue();
Exchange<?> alternateExchange = currentQueue.getAlternateExchange();
boolean autocommit = txn == null;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Tue Apr 5 08:58:56 2016
@@ -504,6 +504,12 @@ public abstract class AbstractSystemMess
}
@Override
+ public void release(ConsumerImpl consumer)
+ {
+ release();
+ }
+
+ @Override
public boolean resend()
{
return false;
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Tue Apr 5 08:58:56 2016
@@ -158,6 +158,11 @@ public class MockMessageInstance impleme
}
@Override
+ public void release(final ConsumerImpl release)
+ {
+ }
+
+ @Override
public boolean resend()
{
return false;
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Tue Apr 5 08:58:56 2016
@@ -405,10 +405,9 @@ public class ConsumerTarget_0_10 extends
void reject(final MessageInstance entry)
{
entry.setRedelivered();
- entry.routeToAlternate(null, null);
- if(isAcquiredByConsumer(entry))
+ if (entry.lockAcquisition())
{
- entry.delete();
+ entry.routeToAlternate(null, null);
}
}
@@ -441,7 +440,7 @@ public class ConsumerTarget_0_10 extends
}
else
{
- entry.release();
+ entry.release(entry.getAcquiringConsumer());
}
}
@@ -449,17 +448,20 @@ public class ConsumerTarget_0_10 extends
{
final ServerMessage msg = entry.getMessage();
- int requeues = entry.routeToAlternate(new Action<MessageInstance>()
- {
- @Override
- public void performAction(final MessageInstance requeueEntry)
- {
- getEventLogger().message(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
- requeueEntry.getOwningResource()
- .getName()));
- }
- }, null);
-
+ int requeues = 0;
+ if (entry.lockAcquisition())
+ {
+ requeues = entry.routeToAlternate(new Action<MessageInstance>()
+ {
+ @Override
+ public void performAction(final MessageInstance requeueEntry)
+ {
+ getEventLogger().message(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+ requeueEntry.getOwningResource()
+ .getName()));
+ }
+ }, null);
+ }
if (requeues == 0)
{
TransactionLogResource owningResource = entry.getOwningResource();
@@ -586,20 +588,6 @@ public class ConsumerTarget_0_10 extends
return _stopped.get();
}
- public boolean deleteAcquired(MessageInstance entry)
- {
- if(isAcquiredByConsumer(entry))
- {
- acquisitionRemoved(entry);
- entry.delete();
- return true;
- }
- else
- {
- return false;
- }
- }
-
@Override
public void acquisitionRemoved(final MessageInstance entry)
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Tue Apr 5 08:58:56 2016
@@ -47,40 +47,17 @@ class ExplicitAcceptDispositionChangeLis
public void onAccept()
{
- if(_target != null && _entry.isAcquiredBy(_consumer) && _entry.lockAcquisition())
- {
- _target.getSessionModel().acknowledge(_target, _entry);
- }
- else
- {
- _logger.debug("MessageAccept received for message which is not been acquired - message may have expired or been removed");
- }
-
+ _target.getSessionModel().acknowledge(_consumer, _target, _entry);
}
public void onRelease(boolean setRedelivered)
{
- if(_target != null && _entry.isAcquiredBy(_consumer))
- {
- _target.release(_entry, setRedelivered);
- }
- else
- {
- _logger.debug("MessageRelease received for message which has not been acquired - message may have expired or been removed");
- }
+ _target.release(_entry, setRedelivered);
}
public void onReject()
{
- if(_target != null && _entry.isAcquiredBy(_consumer))
- {
- _target.reject(_entry);
- }
- else
- {
- _logger.debug("MessageReject received for message which has not been acquired - message may have expired or been removed");
- }
-
+ _target.reject(_entry);
}
public boolean acquire()
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java Tue Apr 5 08:58:56 2016
@@ -51,27 +51,13 @@ class ImplicitAcceptDispositionChangeLis
public void onRelease(boolean setRedelivered)
{
- if(_entry.isAcquiredBy(_consumer))
- {
- _target.release(_entry, setRedelivered);
- }
- else
- {
- _logger.warn("MessageRelease received for message which has not been acquired (likely client error)");
- }
+ _target.release(_entry, setRedelivered);
+
}
public void onReject()
{
- if(_entry.isAcquiredBy(_consumer))
- {
- _target.reject(_entry);
- }
- else
- {
- _logger.warn("MessageReject received for message which has not been acquired (likely client error)");
- }
-
+ _target.reject(_entry);
}
public boolean acquire()
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Tue Apr 5 08:58:56 2016
@@ -58,10 +58,7 @@ public class MessageAcceptCompletionList
{
_sub.getCreditManager().restoreCredit(1l, _messageSize);
}
- if(_entry.isAcquiredBy(_consumer) && _entry.lockAcquisition())
- {
- _session.acknowledge(_sub, _entry);
- }
+ _session.acknowledge(_consumer, _sub, _entry);
_session.removeDispositionListener(method);
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Tue Apr 5 08:58:56 2016
@@ -76,7 +76,6 @@ import org.apache.qpid.server.model.Virt
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.ConsumerListener;
-import org.apache.qpid.server.security.*;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.AMQPConnection;
@@ -524,25 +523,31 @@ public class ServerSession extends Sessi
// Broker shouldn't block awaiting close - thus do override this method to do nothing
}
- public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry)
+ public void acknowledge(final ConsumerImpl consumer,
+ final ConsumerTarget_0_10 target,
+ final MessageInstance entry)
{
- _transaction.dequeue(entry.getEnqueueRecord(),
- new ServerTransaction.Action()
- {
-
- public void postCommit()
+ if (entry.lockAcquisition())
+ {
+ _transaction.dequeue(entry.getEnqueueRecord(),
+ new ServerTransaction.Action()
{
- sub.deleteAcquired(entry);
- }
- public void onRollback()
- {
- // The client has acknowledge the message and therefore have seen it.
- // In the event of rollback, the message must be marked as redelivered.
- entry.setRedelivered();
- entry.release();
- }
- });
+ public void postCommit()
+ {
+ target.acquisitionRemoved(entry);
+ entry.delete();
+ }
+
+ public void onRollback()
+ {
+ // The client has acknowledge the message and therefore have seen it.
+ // In the event of rollback, the message must be marked as redelivered.
+ entry.setRedelivered();
+ entry.release(consumer);
+ }
+ });
+ }
}
Collection<ConsumerTarget_0_10> getSubscriptions()
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Tue Apr 5 08:58:56 2016
@@ -951,7 +951,7 @@ public class AMQChannel
* this same channel or to other subscribers.
*
*/
- public void requeue()
+ private void requeue()
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Collection<MessageInstance> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
@@ -971,7 +971,7 @@ public class AMQChannel
unacked.setRedelivered();
// Ensure message is released for redelivery
- unacked.release();
+ unacked.release(unacked.getAcquiringConsumer());
}
}
@@ -992,8 +992,7 @@ public class AMQChannel
unacked.setRedelivered();
// Ensure message is released for redelivery
- unacked.release();
-
+ unacked.release(unacked.getAcquiringConsumer());
}
else
{
@@ -1033,7 +1032,7 @@ public class AMQChannel
* Called to resend all outstanding unacknowledged messages to this same channel.
*
*/
- public void resend()
+ private void resend()
{
@@ -1106,7 +1105,7 @@ public class AMQChannel
_unacknowledgedMessageMap.remove(deliveryTag);
message.setRedelivered();
- message.release();
+ message.release(message.getAcquiringConsumer());
}
}
@@ -1120,7 +1119,7 @@ public class AMQChannel
* acknowledges the single message specified by the delivery tag
*
*/
- public void acknowledgeMessage(long deliveryTag, boolean multiple)
+ private void acknowledgeMessage(long deliveryTag, boolean multiple)
{
Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple);
_transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
@@ -1253,7 +1252,7 @@ public class AMQChannel
_uncommittedMessages.clear();
}
- public void rollback(Runnable postRollbackTask)
+ private void rollback(Runnable postRollbackTask)
{
// stop all subscriptions
@@ -1282,10 +1281,10 @@ public class AMQChannel
for(MessageInstance entry : _resendList)
{
- ConsumerImpl sub = entry.getDeliveredConsumer();
- if(sub == null || sub.isClosed())
+ ConsumerImpl sub = entry.getAcquiringConsumer();
+ if (sub == null || sub.isClosed())
{
- entry.release();
+ entry.release(sub);
}
else
{
@@ -1623,7 +1622,7 @@ public class AMQChannel
{
for(MessageInstance entry : _ackedMessages)
{
- entry.release();
+ entry.release(entry.getAcquiringConsumer());
}
}
finally
@@ -1764,7 +1763,7 @@ public class AMQChannel
_transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose);
}
- public void deadLetter(long deliveryTag)
+ private void deadLetter(long deliveryTag)
{
final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag);
@@ -1776,9 +1775,10 @@ public class AMQChannel
else
{
final ServerMessage msg = rejectedQueueEntry.getMessage();
-
-
- int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
+ int requeues = 0;
+ if (rejectedQueueEntry.lockAcquisition())
+ {
+ requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
{
@Override
public void performAction(final MessageInstance requeueEntry)
@@ -1789,6 +1789,7 @@ public class AMQChannel
.getName()));
}
}, null);
+ }
if(requeues == 0)
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java Tue Apr 5 08:58:56 2016
@@ -62,10 +62,6 @@ public class ExtractResendAndRequeue imp
_msgToRequeue.put(deliveryTag, message);
}
}
- else
- {
- _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
- }
// false means continue processing
return false;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Tue Apr 5 08:58:56 2016
@@ -240,13 +240,8 @@ class ConsumerTarget_1_0 extends Abstrac
public void onRollback()
{
- if(entry.isAcquiredBy(getConsumer()))
- {
- entry.release();
- _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
-
-
- }
+ entry.release(getConsumer());
+ _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
}
});
}
@@ -257,7 +252,7 @@ class ConsumerTarget_1_0 extends Abstrac
}
else
{
- entry.release();
+ entry.release(getConsumer());
}
}
}
@@ -429,7 +424,7 @@ class ConsumerTarget_1_0 extends Abstrac
_link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
_link.getEndpoint().sendFlowConditional();
_queueEntry.incrementDeliveryCount();
- _queueEntry.release();
+ _queueEntry.release(getConsumer());
}
}
});
@@ -441,7 +436,7 @@ class ConsumerTarget_1_0 extends Abstrac
public void postCommit()
{
- _queueEntry.release();
+ _queueEntry.release(getConsumer());
_link.getEndpoint().settle(_deliveryTag);
}
@@ -459,7 +454,7 @@ class ConsumerTarget_1_0 extends Abstrac
public void postCommit()
{
- _queueEntry.release();
+ _queueEntry.release(getConsumer());
if(Boolean.TRUE.equals(((Modified)outcome).getDeliveryFailed()))
{
_queueEntry.incrementDeliveryCount();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Tue Apr 5 08:58:56 2016
@@ -622,7 +622,7 @@ public class SendingLink_1_0 implements
if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
{
queueEntry.setRedelivered();
- queueEntry.release();
+ queueEntry.release(_consumer);
_unsettledMap.remove(deliveryTag);
}
else if(initialUnsettledMap.get(deliveryTag) instanceof Outcome)
@@ -661,12 +661,11 @@ public class SendingLink_1_0 implements
{
public void postCommit()
{
- queueEntry.release();
+ queueEntry.release(_consumer);
}
public void onRollback()
{
- //To change body of implemented methods use File | Settings | File Templates.
}
});
}
Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Tue Apr 5 08:58:56 2016
@@ -1174,6 +1174,12 @@ class ManagementNode implements MessageS
}
@Override
+ public void release(final ConsumerImpl release)
+ {
+
+ }
+
+ @Override
public boolean resend()
{
return false;
Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java Tue Apr 5 08:58:56 2016
@@ -213,6 +213,12 @@ class ManagementResponse implements Mess
}
@Override
+ public void release(final ConsumerImpl release)
+ {
+ release();
+ }
+
+ @Override
public boolean resend()
{
return false;
Added: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java?rev=1737804&view=auto
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java (added)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java Tue Apr 5 08:58:56 2016
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.client.RejectBehaviour;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.systest.rest.RestTestHelper;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+
+public class LiveQueueOperationsTest extends QpidBrokerTestCase
+{
+
+ private static final int MAX_DELIVERY_COUNT = 2;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ getDefaultBrokerConfiguration().addHttpManagementConfiguration();
+ setTestSystemProperty("queue.deadLetterQueueEnabled","true");
+ setTestSystemProperty("queue.maximumDeliveryAttempts", String.valueOf(MAX_DELIVERY_COUNT));
+
+ // Set client-side flag to allow the server to determine if messages
+ // dead-lettered or requeued.
+ if (!isBroker010())
+ {
+ setTestClientSystemProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.SERVER.toString());
+ }
+
+ super.setUp();
+ }
+
+ public void testClearQueueOperationWithActiveConsumerDlqAll() throws Exception
+ {
+ final String virtualHostName = TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST;
+ RestTestHelper restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
+
+ Connection conn = getConnection();
+ conn.start();
+ final Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = getTestQueue();
+ session.createConsumer(queue).close();
+
+ sendMessage(session, queue, 250);
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ final CountDownLatch clearQueueLatch = new CountDownLatch(10);
+ final AtomicReference<Throwable> throwableAtomicReference = new AtomicReference<>();
+ consumer.setMessageListener(new MessageListener()
+ {
+ @Override
+ public void onMessage(final Message message)
+ {
+ try
+ {
+ clearQueueLatch.countDown();
+ session.rollback();
+ }
+ catch (Throwable t)
+ {
+ throwableAtomicReference.set(t);
+ }
+ }
+ });
+
+
+ boolean ready = clearQueueLatch.await(30, TimeUnit.SECONDS);
+ assertTrue("Consumer did not reach expected point within timeout", ready);
+
+ final String queueUrl = "queue/" + virtualHostName + "/" + virtualHostName + "/" + queue.getQueueName();
+
+ String clearOperationUrl = queueUrl + "/clearQueue";
+ restTestHelper.submitRequest(clearOperationUrl, "POST", Collections.<String,Object>emptyMap(), 200);
+
+ int queueDepthMessages = 0;
+ for (int i = 0; i < 20; ++i)
+ {
+ Map<String, Object> statistics = getStatistics(restTestHelper, queueUrl);
+ queueDepthMessages = (int) statistics.get("queueDepthMessages");
+ if (queueDepthMessages == 0)
+ {
+ break;
+ }
+ Thread.sleep(250);
+ }
+ assertEquals("Queue depth did not reach 0 within expected time", 0, queueDepthMessages);
+
+ consumer.close();
+
+ Map<String, Object> statistics = getStatistics(restTestHelper, queueUrl);
+ queueDepthMessages = (int) statistics.get("queueDepthMessages");
+ assertEquals("Unexpected queue depth after consumer close", 0, queueDepthMessages);
+
+ assertNull("Unexpected exception thrown", throwableAtomicReference.get());
+ }
+
+ private Map<String, Object> getStatistics(final RestTestHelper restTestHelper, final String objectUrl) throws Exception
+ {
+ Map<String, Object> object = restTestHelper.getJsonAsSingletonList(objectUrl);
+ return (Map<String, Object>) object.get("statistics");
+ }
+}
Modified: qpid/java/trunk/test-profiles/CPPExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/CPPExcludes?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/CPPExcludes (original)
+++ qpid/java/trunk/test-profiles/CPPExcludes Tue Apr 5 08:58:56 2016
@@ -105,6 +105,7 @@ org.apache.qpid.server.logging.actors.*
// REST management is used in this test for validation
org.apache.qpid.server.queue.ModelTest#*
+org.apache.qpid.server.queue.LiveQueueOperationsTest#*
// 0-10 is not supported by the MethodRegistry
org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org