You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/04/05 10:58:56 UTC

svn commit: r1737804 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/main/java/org/apach...

Author: kwall
Date: Tue Apr  5 08:58:56 2016
New Revision: 1737804

URL: http://svn.apache.org/viewvc?rev=1737804&view=rev
Log:
QPID-7154: [Java Broker] Ensure that dead letter paths always lock the queue entry acquisition

Also ensure that consumer path queue entry releases release only consumer acquisitions

Added:
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
    qpid/java/trunk/test-profiles/CPPExcludes

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Tue Apr  5 08:58:56 2016
@@ -251,10 +251,7 @@ public abstract class AbstractConsumerTa
             while((instance = _queue.poll()) != null)
             {
                 MessageInstance entry = instance.getEntry();
-                if(entry.isAcquiredBy(instance.getConsumer()))
-                {
-                    entry.release();
-                }
+                entry.release(instance.getConsumer());
                 instance.release();
             }
             doCloseInternal();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Tue Apr  5 08:58:56 2016
@@ -250,6 +250,8 @@ public interface MessageInstance
 
     void release();
 
+    void release(ConsumerImpl release);
+
     boolean resend();
 
     void delete();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java Tue Apr  5 08:58:56 2016
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.Atomi
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -217,6 +218,14 @@ public class LastValueQueueList extends
 
             discardIfReleasedEntryIsNoLongerLatest();
         }
+
+        @Override
+        public void release(ConsumerImpl consumer)
+        {
+            super.release(consumer);
+
+            discardIfReleasedEntryIsNoLongerLatest();
+        }
 
         @Override
         protected void onDelete()

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Apr  5 08:58:56 2016
@@ -209,7 +209,6 @@ public abstract class QueueEntryImpl imp
     {
         return acquire(NON_CONSUMER_ACQUIRED_STATE);
     }
-
     private class DelayedAcquisitionStateListener implements StateChangeListener<MessageInstance, State>
     {
         private final Runnable _task;
@@ -376,36 +375,49 @@ public abstract class QueueEntryImpl imp
         }
     }
 
+    @Override
     public void release()
     {
         EntryState state = _state;
 
         if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
         {
+            postRelease(state);
+        }
+    }
 
-            if(state instanceof ConsumerAcquiredState || state instanceof LockedAcquiredState)
-            {
-                getQueue().decrementUnackedMsgCount(this);
-            }
+    @Override
+    public void release(ConsumerImpl consumer)
+    {
+        EntryState state = _state;
+        if(isAcquiredBy(consumer) && _stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
+        {
+            postRelease(state);
+        }
+    }
 
-            if(!getQueue().isDeleted())
-            {
-                getQueue().requeue(this);
-                if(_stateChangeListeners != null)
-                {
-                    notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
-                }
+    private void postRelease(final EntryState previousState)
+    {
+        if(previousState instanceof ConsumerAcquiredState || previousState instanceof LockedAcquiredState)
+        {
+            getQueue().decrementUnackedMsgCount(this);
+        }
 
-            }
-            else if(acquire())
+        if(!getQueue().isDeleted())
+        {
+            getQueue().requeue(this);
+            if(_stateChangeListeners != null)
             {
-                routeToAlternate(null, null);
+                notifyStateChange(State.ACQUIRED, State.AVAILABLE);
             }
-        }
 
+        }
+        else if(acquire())
+        {
+            routeToAlternate(null, null);
+        }
     }
 
-
     @Override
     public QueueConsumer getDeliveredConsumer()
     {
@@ -528,6 +540,11 @@ public abstract class QueueEntryImpl imp
 
     public int routeToAlternate(final Action<? super MessageInstance> action, ServerTransaction txn)
     {
+        if (!isAcquired())
+        {
+            throw new IllegalStateException("Illegal queue entry state. " + this + " is not acquired.");
+        }
+
         final Queue<?> currentQueue = getQueue();
         Exchange<?> alternateExchange = currentQueue.getAlternateExchange();
         boolean autocommit =  txn == null;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Tue Apr  5 08:58:56 2016
@@ -504,6 +504,12 @@ public abstract class AbstractSystemMess
         }
 
         @Override
+        public void release(ConsumerImpl consumer)
+        {
+            release();
+        }
+
+        @Override
         public boolean resend()
         {
             return false;

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Tue Apr  5 08:58:56 2016
@@ -158,6 +158,11 @@ public class MockMessageInstance impleme
     }
 
     @Override
+    public void release(final ConsumerImpl release)
+    {
+    }
+
+    @Override
     public boolean resend()
     {
         return false;

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Tue Apr  5 08:58:56 2016
@@ -405,10 +405,9 @@ public class ConsumerTarget_0_10 extends
     void reject(final MessageInstance entry)
     {
         entry.setRedelivered();
-        entry.routeToAlternate(null, null);
-        if(isAcquiredByConsumer(entry))
+        if (entry.lockAcquisition())
         {
-            entry.delete();
+            entry.routeToAlternate(null, null);
         }
     }
 
@@ -441,7 +440,7 @@ public class ConsumerTarget_0_10 extends
         }
         else
         {
-            entry.release();
+            entry.release(entry.getAcquiringConsumer());
         }
     }
 
@@ -449,17 +448,20 @@ public class ConsumerTarget_0_10 extends
     {
         final ServerMessage msg = entry.getMessage();
 
-        int requeues = entry.routeToAlternate(new Action<MessageInstance>()
-                    {
-                        @Override
-                        public void performAction(final MessageInstance requeueEntry)
-                        {
-                            getEventLogger().message(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
-                                                                                   requeueEntry.getOwningResource()
-                                                                                           .getName()));
-                        }
-                    }, null);
-
+        int requeues = 0;
+        if (entry.lockAcquisition())
+        {
+            requeues = entry.routeToAlternate(new Action<MessageInstance>()
+            {
+                @Override
+                public void performAction(final MessageInstance requeueEntry)
+                {
+                    getEventLogger().message(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+                                                                           requeueEntry.getOwningResource()
+                                                                                   .getName()));
+                }
+            }, null);
+        }
         if (requeues == 0)
         {
             TransactionLogResource owningResource = entry.getOwningResource();
@@ -586,20 +588,6 @@ public class ConsumerTarget_0_10 extends
         return _stopped.get();
     }
 
-    public boolean deleteAcquired(MessageInstance entry)
-    {
-        if(isAcquiredByConsumer(entry))
-        {
-            acquisitionRemoved(entry);
-            entry.delete();
-            return true;
-        }
-        else
-        {
-            return false;
-        }
-    }
-
     @Override
     public void acquisitionRemoved(final MessageInstance entry)
     {

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Tue Apr  5 08:58:56 2016
@@ -47,40 +47,17 @@ class ExplicitAcceptDispositionChangeLis
 
     public void onAccept()
     {
-        if(_target != null && _entry.isAcquiredBy(_consumer) && _entry.lockAcquisition())
-        {
-            _target.getSessionModel().acknowledge(_target, _entry);
-        }
-        else
-        {
-            _logger.debug("MessageAccept received for message which is not been acquired - message may have expired or been removed");
-        }
-
+        _target.getSessionModel().acknowledge(_consumer, _target, _entry);
     }
 
     public void onRelease(boolean setRedelivered)
     {
-        if(_target != null && _entry.isAcquiredBy(_consumer))
-        {
-            _target.release(_entry, setRedelivered);
-        }
-        else
-        {
-            _logger.debug("MessageRelease received for message which has not been acquired - message may have expired or been removed");
-        }
+        _target.release(_entry, setRedelivered);
     }
 
     public void onReject()
     {
-        if(_target != null && _entry.isAcquiredBy(_consumer))
-        {
-            _target.reject(_entry);
-        }
-        else
-        {
-            _logger.debug("MessageReject received for message which has not been acquired - message may have expired or been removed");
-        }
-
+        _target.reject(_entry);
     }
 
     public boolean acquire()

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java Tue Apr  5 08:58:56 2016
@@ -51,27 +51,13 @@ class ImplicitAcceptDispositionChangeLis
 
     public void onRelease(boolean setRedelivered)
     {
-        if(_entry.isAcquiredBy(_consumer))
-        {
-            _target.release(_entry, setRedelivered);
-        }
-        else
-        {
-            _logger.warn("MessageRelease received for message which has not been acquired (likely client error)");
-        }
+        _target.release(_entry, setRedelivered);
+
     }
 
     public void onReject()
     {
-        if(_entry.isAcquiredBy(_consumer))
-        {
-            _target.reject(_entry);
-        }
-        else
-        {
-            _logger.warn("MessageReject received for message which has not been acquired (likely client error)");
-        }
-
+        _target.reject(_entry);
     }
 
     public boolean acquire()

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Tue Apr  5 08:58:56 2016
@@ -58,10 +58,7 @@ public class MessageAcceptCompletionList
         {
             _sub.getCreditManager().restoreCredit(1l, _messageSize);
         }
-        if(_entry.isAcquiredBy(_consumer) && _entry.lockAcquisition())
-        {
-            _session.acknowledge(_sub, _entry);
-        }
+        _session.acknowledge(_consumer, _sub, _entry);
 
         _session.removeDispositionListener(method);
     }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Tue Apr  5 08:58:56 2016
@@ -76,7 +76,6 @@ import org.apache.qpid.server.model.Virt
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.protocol.ConsumerListener;
-import org.apache.qpid.server.security.*;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.transport.AMQPConnection;
@@ -524,25 +523,31 @@ public class ServerSession extends Sessi
         // Broker shouldn't block awaiting close - thus do override this method to do nothing
     }
 
-    public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry)
+    public void acknowledge(final ConsumerImpl consumer,
+                            final ConsumerTarget_0_10 target,
+                            final MessageInstance entry)
     {
-        _transaction.dequeue(entry.getEnqueueRecord(),
-                             new ServerTransaction.Action()
-                             {
-
-                                 public void postCommit()
+        if (entry.lockAcquisition())
+        {
+            _transaction.dequeue(entry.getEnqueueRecord(),
+                                 new ServerTransaction.Action()
                                  {
-                                     sub.deleteAcquired(entry);
-                                 }
 
-                                 public void onRollback()
-                                 {
-                                     // The client has acknowledge the message and therefore have seen it.
-                                     // In the event of rollback, the message must be marked as redelivered.
-                                     entry.setRedelivered();
-                                     entry.release();
-                                 }
-                             });
+                                     public void postCommit()
+                                     {
+                                         target.acquisitionRemoved(entry);
+                                         entry.delete();
+                                     }
+
+                                     public void onRollback()
+                                     {
+                                         // The client has acknowledge the message and therefore have seen it.
+                                         // In the event of rollback, the message must be marked as redelivered.
+                                         entry.setRedelivered();
+                                         entry.release(consumer);
+                                     }
+                                 });
+        }
     }
 
     Collection<ConsumerTarget_0_10> getSubscriptions()

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Tue Apr  5 08:58:56 2016
@@ -951,7 +951,7 @@ public class AMQChannel
      * this same channel or to other subscribers.
      *
      */
-    public void requeue()
+    private void requeue()
     {
         // we must create a new map since all the messages will get a new delivery tag when they are redelivered
         Collection<MessageInstance> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
@@ -971,7 +971,7 @@ public class AMQChannel
             unacked.setRedelivered();
 
             // Ensure message is released for redelivery
-            unacked.release();
+            unacked.release(unacked.getAcquiringConsumer());
         }
 
     }
@@ -992,8 +992,7 @@ public class AMQChannel
             unacked.setRedelivered();
 
             // Ensure message is released for redelivery
-            unacked.release();
-
+            unacked.release(unacked.getAcquiringConsumer());
         }
         else
         {
@@ -1033,7 +1032,7 @@ public class AMQChannel
      * Called to resend all outstanding unacknowledged messages to this same channel.
      *
      */
-    public void resend()
+    private void resend()
     {
 
 
@@ -1106,7 +1105,7 @@ public class AMQChannel
             _unacknowledgedMessageMap.remove(deliveryTag);
 
             message.setRedelivered();
-            message.release();
+            message.release(message.getAcquiringConsumer());
 
         }
     }
@@ -1120,7 +1119,7 @@ public class AMQChannel
      *                    acknowledges the single message specified by the delivery tag
      *
      */
-    public void acknowledgeMessage(long deliveryTag, boolean multiple)
+    private void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
         Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple);
         _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
@@ -1253,7 +1252,7 @@ public class AMQChannel
         _uncommittedMessages.clear();
     }
 
-    public void rollback(Runnable postRollbackTask)
+    private void rollback(Runnable postRollbackTask)
     {
 
         // stop all subscriptions
@@ -1282,10 +1281,10 @@ public class AMQChannel
 
         for(MessageInstance entry : _resendList)
         {
-            ConsumerImpl sub = entry.getDeliveredConsumer();
-            if(sub == null || sub.isClosed())
+            ConsumerImpl sub = entry.getAcquiringConsumer();
+            if (sub == null || sub.isClosed())
             {
-                entry.release();
+                entry.release(sub);
             }
             else
             {
@@ -1623,7 +1622,7 @@ public class AMQChannel
                 {
                     for(MessageInstance entry : _ackedMessages)
                     {
-                        entry.release();
+                        entry.release(entry.getAcquiringConsumer());
                     }
                 }
                 finally
@@ -1764,7 +1763,7 @@ public class AMQChannel
         _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose);
     }
 
-    public void deadLetter(long deliveryTag)
+    private void deadLetter(long deliveryTag)
     {
         final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
         final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag);
@@ -1776,9 +1775,10 @@ public class AMQChannel
         else
         {
             final ServerMessage msg = rejectedQueueEntry.getMessage();
-
-
-            int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
+            int requeues = 0;
+            if (rejectedQueueEntry.lockAcquisition())
+            {
+                requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
                 {
                     @Override
                     public void performAction(final MessageInstance requeueEntry)
@@ -1789,6 +1789,7 @@ public class AMQChannel
                                                                                                         .getName()));
                     }
                 }, null);
+            }
 
             if(requeues == 0)
             {

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java Tue Apr  5 08:58:56 2016
@@ -62,10 +62,6 @@ public class ExtractResendAndRequeue imp
                 _msgToRequeue.put(deliveryTag, message);
             }
         }
-        else
-        {
-            _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
-        }
 
         // false means continue processing
         return false;

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Tue Apr  5 08:58:56 2016
@@ -240,13 +240,8 @@ class ConsumerTarget_1_0 extends Abstrac
 
                                 public void onRollback()
                                 {
-                                    if(entry.isAcquiredBy(getConsumer()))
-                                    {
-                                        entry.release();
-                                        _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
-
-
-                                    }
+                                    entry.release(getConsumer());
+                                    _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
                                 }
                             });
                         }
@@ -257,7 +252,7 @@ class ConsumerTarget_1_0 extends Abstrac
                 }
                 else
                 {
-                    entry.release();
+                    entry.release(getConsumer());
                 }
             }
         }
@@ -429,7 +424,7 @@ class ConsumerTarget_1_0 extends Abstrac
                                 _link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
                                 _link.getEndpoint().sendFlowConditional();
                                 _queueEntry.incrementDeliveryCount();
-                                _queueEntry.release();
+                                _queueEntry.release(getConsumer());
                             }
                         }
                     });
@@ -441,7 +436,7 @@ class ConsumerTarget_1_0 extends Abstrac
                     public void postCommit()
                     {
 
-                        _queueEntry.release();
+                        _queueEntry.release(getConsumer());
                         _link.getEndpoint().settle(_deliveryTag);
                     }
 
@@ -459,7 +454,7 @@ class ConsumerTarget_1_0 extends Abstrac
                     public void postCommit()
                     {
 
-                        _queueEntry.release();
+                        _queueEntry.release(getConsumer());
                         if(Boolean.TRUE.equals(((Modified)outcome).getDeliveryFailed()))
                         {
                             _queueEntry.incrementDeliveryCount();

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Tue Apr  5 08:58:56 2016
@@ -622,7 +622,7 @@ public class SendingLink_1_0 implements
             if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
             {
                 queueEntry.setRedelivered();
-                queueEntry.release();
+                queueEntry.release(_consumer);
                 _unsettledMap.remove(deliveryTag);
             }
             else if(initialUnsettledMap.get(deliveryTag) instanceof Outcome)
@@ -661,12 +661,11 @@ public class SendingLink_1_0 implements
                                 {
                                     public void postCommit()
                                     {
-                                        queueEntry.release();
+                                        queueEntry.release(_consumer);
                                     }
 
                                     public void onRollback()
                                     {
-                                        //To change body of implemented methods use File | Settings | File Templates.
                                     }
                                 });
                     }

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Tue Apr  5 08:58:56 2016
@@ -1174,6 +1174,12 @@ class ManagementNode implements MessageS
         }
 
         @Override
+        public void release(final ConsumerImpl release)
+        {
+
+        }
+
+        @Override
         public boolean resend()
         {
             return false;

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java Tue Apr  5 08:58:56 2016
@@ -213,6 +213,12 @@ class ManagementResponse implements Mess
     }
 
     @Override
+    public void release(final ConsumerImpl release)
+    {
+        release();
+    }
+
+    @Override
     public boolean resend()
     {
         return false;

Added: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java?rev=1737804&view=auto
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java (added)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LiveQueueOperationsTest.java Tue Apr  5 08:58:56 2016
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.client.RejectBehaviour;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.systest.rest.RestTestHelper;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+
+public class LiveQueueOperationsTest extends QpidBrokerTestCase
+{
+
+    private static final int MAX_DELIVERY_COUNT = 2;
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        getDefaultBrokerConfiguration().addHttpManagementConfiguration();
+        setTestSystemProperty("queue.deadLetterQueueEnabled","true");
+        setTestSystemProperty("queue.maximumDeliveryAttempts", String.valueOf(MAX_DELIVERY_COUNT));
+
+        // Set client-side flag to allow the server to determine if messages
+        // dead-lettered or requeued.
+        if (!isBroker010())
+        {
+            setTestClientSystemProperty(ClientProperties.REJECT_BEHAVIOUR_PROP_NAME, RejectBehaviour.SERVER.toString());
+        }
+
+        super.setUp();
+    }
+
+    public void testClearQueueOperationWithActiveConsumerDlqAll() throws Exception
+    {
+        final String virtualHostName = TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST;
+        RestTestHelper restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
+
+        Connection conn = getConnection();
+        conn.start();
+        final Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+        Queue queue = getTestQueue();
+        session.createConsumer(queue).close();
+
+        sendMessage(session, queue, 250);
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        final CountDownLatch clearQueueLatch = new CountDownLatch(10);
+        final AtomicReference<Throwable> throwableAtomicReference = new AtomicReference<>();
+        consumer.setMessageListener(new MessageListener()
+        {
+            @Override
+            public void onMessage(final Message message)
+            {
+                try
+                {
+                    clearQueueLatch.countDown();
+                    session.rollback();
+                }
+                catch (Throwable t)
+                {
+                    throwableAtomicReference.set(t);
+                }
+            }
+        });
+
+
+        boolean ready = clearQueueLatch.await(30, TimeUnit.SECONDS);
+        assertTrue("Consumer did not reach expected point within timeout", ready);
+
+        final String queueUrl = "queue/" + virtualHostName + "/" + virtualHostName + "/" + queue.getQueueName();
+
+        String clearOperationUrl = queueUrl + "/clearQueue";
+        restTestHelper.submitRequest(clearOperationUrl, "POST", Collections.<String,Object>emptyMap(), 200);
+
+        int queueDepthMessages = 0;
+        for (int i = 0; i < 20; ++i)
+        {
+            Map<String, Object> statistics = getStatistics(restTestHelper, queueUrl);
+            queueDepthMessages = (int) statistics.get("queueDepthMessages");
+            if (queueDepthMessages == 0)
+            {
+                break;
+            }
+            Thread.sleep(250);
+        }
+        assertEquals("Queue depth did not reach 0 within expected time", 0, queueDepthMessages);
+
+        consumer.close();
+
+        Map<String, Object> statistics = getStatistics(restTestHelper, queueUrl);
+        queueDepthMessages = (int) statistics.get("queueDepthMessages");
+        assertEquals("Unexpected queue depth after consumer close", 0, queueDepthMessages);
+
+        assertNull("Unexpected exception thrown", throwableAtomicReference.get());
+    }
+
+    private Map<String, Object> getStatistics(final RestTestHelper restTestHelper, final String objectUrl) throws Exception
+    {
+        Map<String, Object> object = restTestHelper.getJsonAsSingletonList(objectUrl);
+        return (Map<String, Object>) object.get("statistics");
+    }
+}

Modified: qpid/java/trunk/test-profiles/CPPExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/CPPExcludes?rev=1737804&r1=1737803&r2=1737804&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/CPPExcludes (original)
+++ qpid/java/trunk/test-profiles/CPPExcludes Tue Apr  5 08:58:56 2016
@@ -105,6 +105,7 @@ org.apache.qpid.server.logging.actors.*
 
 // REST management is used in this test for validation
 org.apache.qpid.server.queue.ModelTest#*
+org.apache.qpid.server.queue.LiveQueueOperationsTest#*
 
 // 0-10 is not supported by the MethodRegistry
 org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*




---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org