You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/07/10 13:15:05 UTC

[2/2] qpid-broker-j git commit: QPID-7815: Add support for reject policy

QPID-7815: Add support for reject policy

The patch is based on work implemented by Tomas Vavricka


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c5e340f0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c5e340f0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c5e340f0

Branch: refs/heads/master
Commit: c5e340f0e85dd30947bc4270d14991036c95cbac
Parents: 8468aa0
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Jul 10 14:12:51 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Mon Jul 10 14:14:42 2017 +0100

----------------------------------------------------------------------
 .../berkeleydb/AbstractBDBMessageStore.java     |  22 ++-
 .../apache/qpid/server/message/RejectType.java  |  28 ++++
 .../qpid/server/message/RoutingResult.java      |  86 ++++++-----
 .../qpid/server/model/OverflowPolicy.java       |   3 +-
 .../org/apache/qpid/server/model/Queue.java     |  20 ++-
 .../apache/qpid/server/queue/AbstractQueue.java | 115 +++++++++-----
 .../server/queue/CopyMessagesTransaction.java   |  25 ++--
 .../server/queue/DeleteMessagesTransaction.java |   3 +-
 .../FlowToDiskOverflowPolicyHandlerFactory.java |  42 ------
 .../queue/MessageUnacceptableException.java     |  29 ++++
 .../server/queue/MoveMessagesTransaction.java   |  29 ++--
 .../server/queue/NoneOverflowPolicyHandler.java |   3 +-
 .../queue/NoneOverflowPolicyHandlerFactory.java |  43 ------
 .../server/queue/OverflowPolicyHandler.java     |   3 +-
 .../queue/OverflowPolicyHandlerFactory.java     |  29 ----
 ...FlowControlOverflowPolicyHandlerFactory.java |  42 ------
 .../server/queue/QueueEntryTransaction.java     |   8 +-
 .../QueueSizeLimitRespectingTransaction.java    |  77 ++++++++++
 .../qpid/server/queue/RejectPolicyHandler.java  |  98 ++++++++++++
 .../queue/RingOverflowPolicyHandlerFactory.java |  43 ------
 .../qpid/server/store/MemoryMessageStore.java   |  22 +++
 .../apache/qpid/server/store/MessageStore.java  |   9 ++
 .../qpid/server/store/NullMessageStore.java     |  12 ++
 .../server/queue/AbstractQueueTestBase.java     |  65 +++++++-
 .../server/queue/RejectPolicyHandlerTest.java   | 142 ++++++++++++++++++
 .../qpid/server/store/MessageStoreTestCase.java |  15 +-
 .../server/protocol/v0_10/ServerSession.java    |  10 +-
 .../protocol/v0_10/ServerSessionDelegate.java   |  32 +++-
 .../qpid/server/protocol/v0_8/AMQChannel.java   | 134 +++++++----------
 .../protocol/v1_0/ExchangeDestination.java      |  47 ++++--
 .../protocol/v1_0/NodeReceivingDestination.java |  47 ++++--
 .../store/jdbc/AbstractJDBCMessageStore.java    |  20 +++
 .../qpid/test/client/queue/QueuePolicyTest.java | 148 +++++++++++--------
 test-profiles/Java10Excludes                    |   1 -
 test-profiles/JavaExcludes                      |   3 -
 test-profiles/JavaPre010Excludes                |   2 -
 36 files changed, 942 insertions(+), 515 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 7260557..e6262a6 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -60,7 +60,6 @@ import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.txn.Xid;
 import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
 import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
 import org.apache.qpid.server.store.berkeleydb.tuple.ByteBufferBinding;
@@ -71,6 +70,7 @@ import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
 import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
 import org.apache.qpid.server.store.handler.MessageHandler;
 import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.txn.Xid;
 import org.apache.qpid.server.util.CachingUUIDFactory;
 
 
@@ -107,6 +107,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
     private final AtomicLong _inMemorySize = new AtomicLong();
     private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
     private final Set<StoredBDBMessage<?>> _messages = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    private final Set<MessageDeleteListener> _messageDeleteListeners = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
     @Override
     public void upgradeStoreStructure() throws StoreException
@@ -1229,6 +1230,13 @@ public abstract class AbstractBDBMessageStore implements MessageStore
             }
             _messageDataRef = null;
             _inMemorySize.addAndGet(-bytesCleared);
+            if (!_messageDeleteListeners.isEmpty())
+            {
+                for (final MessageDeleteListener messageDeleteListener : _messageDeleteListeners)
+                {
+                    messageDeleteListener.messageDeleted(this);
+                }
+            }
         }
 
         @Override
@@ -1406,6 +1414,18 @@ public abstract class AbstractBDBMessageStore implements MessageStore
 
     }
 
+    @Override
+    public void addMessageDeleteListener(final MessageDeleteListener listener)
+    {
+        _messageDeleteListeners.add(listener);
+    }
+
+    @Override
+    public void removeMessageDeleteListener(final MessageDeleteListener listener)
+    {
+        _messageDeleteListeners.remove(listener);
+    }
+
     private static class BDBStoredXidRecord implements org.apache.qpid.server.store.Transaction.StoredXidRecord
     {
         private final long _format;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/message/RejectType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/RejectType.java b/broker-core/src/main/java/org/apache/qpid/server/message/RejectType.java
new file mode 100644
index 0000000..85c458b
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/RejectType.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+public enum RejectType
+{
+    LIMIT_EXCEEDED,
+    ALREADY_ENQUEUED,
+    PRECONDITION_FAILED
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java b/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
index 29c31b3..55c63e6 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
@@ -43,7 +43,7 @@ public class RoutingResult<M extends ServerMessage<? extends StorableMessageMeta
     private final M _message;
 
     private final Set<BaseQueue> _queues = new HashSet<>();
-    private final Map<BaseQueue, CharSequence> _notAcceptingRoutableQueues = new HashMap<>();
+    private final Map<BaseQueue, RejectReason> _rejectingRoutableQueues = new HashMap<>();
 
     public RoutingResult(final M message)
     {
@@ -86,11 +86,11 @@ public class RoutingResult<M extends ServerMessage<? extends StorableMessageMeta
     public void add(RoutingResult<M> result)
     {
         addQueues(result._queues);
-        for (Map.Entry<BaseQueue, CharSequence> e : result._notAcceptingRoutableQueues.entrySet())
+        for (Map.Entry<BaseQueue, RejectReason> e : result._rejectingRoutableQueues.entrySet())
         {
             if (!e.getKey().isDeleted())
             {
-                _notAcceptingRoutableQueues.put(e.getKey(), e.getValue());
+                _rejectingRoutableQueues.put(e.getKey(), e.getValue());
             }
         }
     }
@@ -98,31 +98,12 @@ public class RoutingResult<M extends ServerMessage<? extends StorableMessageMeta
     public int send(ServerTransaction txn,
                     final Action<? super MessageInstance> postEnqueueAction)
     {
-        for(BaseQueue q : _queues)
+        if (containsReject(RejectType.LIMIT_EXCEEDED, RejectType.PRECONDITION_FAILED))
         {
-            if(!_message.isResourceAcceptable(q))
-            {
-                return 0;
-            }
+            return 0;
         }
-        final BaseQueue[] baseQueues;
 
-        if(_message.isReferenced())
-        {
-            ArrayList<BaseQueue> uniqueQueues = new ArrayList<>(_queues.size());
-            for(BaseQueue q : _queues)
-            {
-                if(!_message.isReferenced(q))
-                {
-                    uniqueQueues.add(q);
-                }
-            }
-            baseQueues = uniqueQueues.toArray(new BaseQueue[uniqueQueues.size()]);
-        }
-        else
-        {
-            baseQueues = _queues.toArray(new BaseQueue[_queues.size()]);
-        }
+        final BaseQueue[] queues = _queues.toArray(new BaseQueue[_queues.size()]);
         txn.enqueue(_queues, _message, new ServerTransaction.EnqueueAction()
         {
             MessageReference _reference = _message.newReference();
@@ -131,9 +112,9 @@ public class RoutingResult<M extends ServerMessage<? extends StorableMessageMeta
             {
                 try
                 {
-                    for(int i = 0; i < baseQueues.length; i++)
+                    for(int i = 0; i < queues.length; i++)
                     {
-                        baseQueues[i].enqueue(_message, postEnqueueAction, records[i]);
+                        queues[i].enqueue(_message, postEnqueueAction, records[i]);
                     }
                 }
                 finally
@@ -155,27 +136,64 @@ public class RoutingResult<M extends ServerMessage<? extends StorableMessageMeta
         return !_queues.isEmpty();
     }
 
-    public void addNotAcceptingRoutableQueue(BaseQueue q, CharSequence reason)
+    public void addRejectReason(BaseQueue q, final RejectType rejectType, String reason)
+    {
+        _rejectingRoutableQueues.put(q, new RejectReason(rejectType, reason));
+    }
+
+    public boolean isRejected()
     {
-        _notAcceptingRoutableQueues.put(q, reason);
+        return !_rejectingRoutableQueues.isEmpty();
     }
 
-    public boolean hasNotAcceptingRoutableQueue()
+    public boolean containsReject(RejectType... type)
     {
-        return !_notAcceptingRoutableQueues.isEmpty();
+        for(RejectReason reason: _rejectingRoutableQueues.values())
+        {
+            for(RejectType t: type)
+            {
+                if (reason.getRejectType() == t)
+                {
+                    return true;
+                }
+            }
+        }
+        return false;
     }
 
-    public String getUnacceptanceCause()
+    public String getRejectReason()
     {
         StringBuilder refusalMessages = new StringBuilder();
-        for (CharSequence message : _notAcceptingRoutableQueues.values())
+        for (RejectReason reason : _rejectingRoutableQueues.values())
         {
             if (refusalMessages.length() > 0)
             {
                 refusalMessages.append(";");
             }
-            refusalMessages.append(message);
+            refusalMessages.append(reason.getReason());
         }
         return refusalMessages.toString();
     }
+
+    private static class RejectReason
+    {
+        private final RejectType _rejectType;
+        private final String _reason;
+
+        private RejectReason(final RejectType rejectType, final String reason)
+        {
+            _rejectType = rejectType;
+            _reason = reason;
+        }
+
+        private RejectType getRejectType()
+        {
+            return _rejectType;
+        }
+
+        public String getReason()
+        {
+            return _reason;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java b/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
index 57f334d..a742b78 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
@@ -25,5 +25,6 @@ public enum OverflowPolicy
     NONE,
     RING,
     PRODUCER_FLOW_CONTROL,
-    FLOW_TO_DISK
+    FLOW_TO_DISK,
+    REJECT
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 148291c..ae9bdf7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -275,17 +275,15 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
 
     @ManagedAttribute(defaultValue = "${queue.defaultOverflowPolicy}",
             description = "Queue overflow policy."
-                          + " Current options are ProducerFlowControl, Ring, FlowToDisk, and None."
-                          + " ProducerFlowControl overflow policy - when queue message number or size of messages"
-                          + " in queue exceeds maximum, the producing sessions are blocked until queue depth falls"
-                          + " below the resume threshold."
-                          + " Ring overflow policy - when queue message number or size of messages in queue exceeds"
-                          + " maximum, oldest messages are discarded."
-                          + " FlowToDisk overflow policy - when queue message number or size of messages"
-                          + " in queue exceeds maximum, new incoming messages are written to disk and immediately"
-                          + " evicted from memory."
-                          + " None overflow policy - queue capacity is unbounded, the attributes defining the limits for"
-                          + " maximum message number and maximum number of bytes are not applied.",
+                          + " Options are ProducerFlowControl, Ring, FlowToDisk, Reject, and None."
+                          + " The policy comes into effect where queue limits described by maximumQueueDepthBytes"
+                          + " and/or maximumQueueDepthMessage are breached."
+                          + " ProducerFlowControl - the producing sessions are blocked until queue size"
+                          + " falls beneath resume threshold (see context variable queue.queueFlowResumeLimit)."
+                          + " Ring - oldest messages are discarded."
+                          + " Reject - incoming messages are rejected."
+                          + " FlowToDisk - new incoming messages are written to disk and immediately evicted from memory."
+                          + " None - queue capacity is unbounded.",
             mandatory = true)
     OverflowPolicy getOverflowPolicy();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 6129402..bd2f6a3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -92,6 +92,7 @@ import org.apache.qpid.server.message.MessageInfoImpl;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSender;
+import org.apache.qpid.server.message.RejectType;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
@@ -108,6 +109,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.session.AMQPSession;
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.transport.AMQPConnection;
@@ -240,7 +242,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     @ManagedAttributeField
     private volatile boolean _holdOnPublishEnabled;
 
-    @ManagedAttributeField
+    @ManagedAttributeField(afterSet = "postSetOverflowPolicy")
     private OverflowPolicy _overflowPolicy;
     @ManagedAttributeField
     private long _maximumQueueDepthMessages;
@@ -264,7 +266,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
     private AdvanceConsumersTask _queueHouseKeepingTask;
     private volatile int _bindingCount;
-    private volatile OverflowPolicyHandler _overflowPolicyHandler;
+    private volatile RejectPolicyHandler _rejectPolicyHandler;
+    private volatile OverflowPolicyHandler _postEnqueueOverflowPolicyHandler;
     private long _flowToDiskThreshold;
     private volatile MessageDestination _alternateBindingDestination;
 
@@ -358,8 +361,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
 
         _logSubject = new QueueLogSubject(this);
 
-        _overflowPolicyHandler = createOverflowPolicyHandler(getOverflowPolicy());
-
         _queueHouseKeepingTask = new AdvanceConsumersTask();
         Subject activeSubject = Subject.getSubject(AccessController.getContext());
         Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
@@ -576,6 +577,48 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         updateAlertChecks();
     }
 
+    private void createOverflowPolicyHandler(final OverflowPolicy overflowPolicy)
+    {
+        MessageStore messageStore = getVirtualHost().getMessageStore();
+
+        if (overflowPolicy == OverflowPolicy.REJECT)
+        {
+            _postEnqueueOverflowPolicyHandler = new NoneOverflowPolicyHandler();
+            _rejectPolicyHandler = new RejectPolicyHandler(this);
+            messageStore.addMessageDeleteListener(_rejectPolicyHandler);
+        }
+        else
+        {
+            if (_rejectPolicyHandler != null)
+            {
+                messageStore.removeMessageDeleteListener(_rejectPolicyHandler);
+                _rejectPolicyHandler = null;
+            }
+
+            OverflowPolicyHandler overflowPolicyHandler;
+            switch (overflowPolicy)
+            {
+                case RING:
+                    overflowPolicyHandler = new RingOverflowPolicyHandler(this, getEventLogger());
+                    break;
+                case PRODUCER_FLOW_CONTROL:
+                    overflowPolicyHandler = new ProducerFlowControlOverflowPolicyHandler(this, getEventLogger());
+                    break;
+                case FLOW_TO_DISK:
+                    overflowPolicyHandler = new FlowToDiskOverflowPolicyHandler(this);
+                    break;
+                case NONE:
+                    overflowPolicyHandler = new NoneOverflowPolicyHandler();
+                    break;
+                default:
+                    throw new IllegalStateException(String.format("Overflow policy '%s' is not implemented",
+                                                                  overflowPolicy.name()));
+            }
+
+            _postEnqueueOverflowPolicyHandler = overflowPolicyHandler;
+        }
+    }
+
     protected LogMessage getCreatedLogMessage()
     {
         String ownerString = getOwner();
@@ -1251,7 +1294,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
             {
                 action.performAction(entry);
             }
-            _overflowPolicyHandler.checkOverflow(entry);
+
+            RejectPolicyHandler rejectPolicyHandler = _rejectPolicyHandler;
+            if (rejectPolicyHandler != null)
+            {
+                rejectPolicyHandler.postEnqueue(entry);
+            }
+            _postEnqueueOverflowPolicyHandler.checkOverflow(entry);
         }
 
     }
@@ -1841,7 +1890,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     @Override
     public void checkCapacity()
     {
-        _overflowPolicyHandler.checkOverflow(null);
+        _postEnqueueOverflowPolicyHandler.checkOverflow(null);
     }
 
     void notifyConsumers(QueueEntry entry)
@@ -2609,15 +2658,31 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         RoutingResult<M> result = new RoutingResult<>(message);
         if (!message.isResourceAcceptable(this))
         {
-            result.addNotAcceptingRoutableQueue(this, String.format("Not accepted by queue '%s'", getName()));
+            result.addRejectReason(this,
+                                   RejectType.PRECONDITION_FAILED,
+                                   String.format("Not accepted by queue '%s'", getName()));
         }
         else if (message.isReferenced(this))
         {
-            result.addNotAcceptingRoutableQueue(this, String.format("Already enqueued on queue '%s'", getName()));
+            result.addRejectReason(this,
+                                   RejectType.ALREADY_ENQUEUED,
+                                   String.format("Already enqueued on queue '%s'", getName()));
         }
         else
         {
-            result.addQueue(this);
+            try
+            {
+                RejectPolicyHandler rejectPolicyHandler = _rejectPolicyHandler;
+                if (rejectPolicyHandler != null)
+                {
+                    rejectPolicyHandler.checkReject(message);
+                }
+                result.addQueue(this);
+            }
+            catch (MessageUnacceptableException e)
+            {
+                result.addRejectReason(this, RejectType.LIMIT_EXCEEDED, e.getMessage());
+            }
         }
         return result;
     }
@@ -2946,9 +3011,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     @Override
     public boolean isQueueFlowStopped()
     {
-        if (_overflowPolicyHandler instanceof ProducerFlowControlOverflowPolicyHandler)
+        if (_postEnqueueOverflowPolicyHandler instanceof ProducerFlowControlOverflowPolicyHandler)
         {
-            return ((ProducerFlowControlOverflowPolicyHandler)_overflowPolicyHandler).isQueueFlowStopped();
+            return ((ProducerFlowControlOverflowPolicyHandler) _postEnqueueOverflowPolicyHandler).isQueueFlowStopped();
         }
         return false;
     }
@@ -2995,31 +3060,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
 
     }
 
-    @Override
-    protected void changeAttributes(Map<String,Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
-    {
-        OverflowPolicy oldOverflowPolicy = getOverflowPolicy();
-        super.changeAttributes(attributes);
-
-        OverflowPolicy newOverflowPolicy = getOverflowPolicy();
-        if (oldOverflowPolicy != newOverflowPolicy)
-        {
-            _overflowPolicyHandler = createOverflowPolicyHandler(newOverflowPolicy);
-            _overflowPolicyHandler.checkOverflow(null);
-        }
-    }
-
-    private OverflowPolicyHandler createOverflowPolicyHandler(final OverflowPolicy overflowPolicy)
+    @SuppressWarnings("ignore")
+    private void postSetOverflowPolicy()
     {
-        OverflowPolicyHandlerFactory factory =
-                new QpidServiceLoader().getInstancesByType(OverflowPolicyHandlerFactory.class)
-                                       .get(String.valueOf(overflowPolicy));
-        if (factory == null)
-        {
-            throw new IllegalStateException(String.format("Factory for overflow policy '%s' is not found",
-                                                          overflowPolicy.name()));
-        }
-        return factory.create(this, getEventLogger());
+        createOverflowPolicyHandler(getOverflowPolicy());
+        _postEnqueueOverflowPolicyHandler.checkOverflow(null);
     }
 
     private static final String[] NON_NEGATIVE_NUMBERS = {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
index cf073f5..fb7ec30 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
@@ -23,30 +23,25 @@ package org.apache.qpid.server.queue;
 import java.util.List;
 
 import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
-public class CopyMessagesTransaction extends QueueEntryTransaction
+class CopyMessagesTransaction extends QueueSizeLimitRespectingTransaction
 {
-    private final Queue _destinationQueue;
 
-    public CopyMessagesTransaction(Queue sourceQueue,
-                                   List<Long> messageIds,
-                                   Queue destinationQueue,
-                                   final MessageFilter filter, final int limit)
+    CopyMessagesTransaction(Queue sourceQueue,
+                            List<Long> messageIds,
+                            Queue destinationQueue,
+                            final MessageFilter filter, final int limit)
     {
-        super(sourceQueue, messageIds, filter, limit);
-        _destinationQueue = destinationQueue;
+        super(sourceQueue, messageIds, destinationQueue, filter, limit);
     }
 
     @Override
-    protected void updateEntry(QueueEntry entry, QueueManagingVirtualHost.Transaction txn)
+    void performOperation(final QueueEntry entry,
+                          final QueueManagingVirtualHost.Transaction txn,
+                          final Queue destinationQueue)
     {
-        ServerMessage msg = entry.getMessage();
-        if(msg != null && !msg.isReferenced(_destinationQueue))
-        {
-            txn.copy(entry, _destinationQueue);
-        }
+        txn.copy(entry, destinationQueue);
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
index 84aa0ed..1357ce7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
@@ -37,8 +37,9 @@ public class DeleteMessagesTransaction extends QueueEntryTransaction
     }
 
     @Override
-    protected void updateEntry(QueueEntry entry, QueueManagingVirtualHost.Transaction txn)
+    protected boolean updateEntry(QueueEntry entry, QueueManagingVirtualHost.Transaction txn)
     {
         txn.dequeue(entry);
+        return false;
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerFactory.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerFactory.java b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerFactory.java
deleted file mode 100644
index abcf98d..0000000
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.model.OverflowPolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.plugin.PluggableService;
-
-@SuppressWarnings("unused")
-@PluggableService
-public class FlowToDiskOverflowPolicyHandlerFactory implements OverflowPolicyHandlerFactory
-{
-    @Override
-    public String getType()
-    {
-        return OverflowPolicy.FLOW_TO_DISK.name();
-    }
-
-    @Override
-    public OverflowPolicyHandler create(final Queue<?> queue, final EventLogger eventLogger)
-    {
-        return new FlowToDiskOverflowPolicyHandler(queue);
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/MessageUnacceptableException.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/MessageUnacceptableException.java b/broker-core/src/main/java/org/apache/qpid/server/queue/MessageUnacceptableException.java
new file mode 100644
index 0000000..48579c6
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/MessageUnacceptableException.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public class MessageUnacceptableException extends Exception
+{
+    public MessageUnacceptableException(final String message)
+    {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
index 7e5ba68..4e42c08 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
@@ -23,30 +23,25 @@ package org.apache.qpid.server.queue;
 import java.util.List;
 
 import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
-public class MoveMessagesTransaction extends QueueEntryTransaction
+public class MoveMessagesTransaction extends QueueSizeLimitRespectingTransaction
 {
-    private final Queue _destinationQueue;
-
-    public MoveMessagesTransaction(Queue sourceQueue,
-                                   List<Long> messageIds,
-                                   Queue destinationQueue,
-                                   final MessageFilter filter, final int limit)
+    MoveMessagesTransaction(Queue sourceQueue,
+                            List<Long> messageIds,
+                            Queue destinationQueue,
+                            final MessageFilter filter,
+                            final int limit)
     {
-        super(sourceQueue, messageIds, filter, limit);
-        _destinationQueue = destinationQueue;
+        super(sourceQueue, messageIds, destinationQueue, filter, limit);
     }
 
-    @Override
-    protected void updateEntry(QueueEntry entry, QueueManagingVirtualHost.Transaction txn)
+
+    void performOperation(final QueueEntry entry,
+                          final QueueManagingVirtualHost.Transaction txn,
+                          final Queue destinationQueue)
     {
-        ServerMessage msg = entry.getMessage();
-        if(msg != null && !msg.isReferenced(_destinationQueue))
-        {
-            txn.move(entry, _destinationQueue);
-        }
+        txn.move(entry, destinationQueue);
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java
index 61314b4..1d7ed77 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java
@@ -21,10 +21,9 @@ package org.apache.qpid.server.queue;
 
 public class NoneOverflowPolicyHandler implements OverflowPolicyHandler
 {
+
     @Override
     public void checkOverflow(final QueueEntry newlyEnqueued)
     {
-        // noop
     }
-
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandlerFactory.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandlerFactory.java b/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandlerFactory.java
deleted file mode 100644
index f11da7d..0000000
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandlerFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.model.OverflowPolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.plugin.PluggableService;
-
-@PluggableService
-public class NoneOverflowPolicyHandlerFactory implements OverflowPolicyHandlerFactory
-{
-
-    @Override
-    public String getType()
-    {
-        return OverflowPolicy.NONE.name();
-    }
-
-    @Override
-    public OverflowPolicyHandler create(final Queue<?> queue,
-                                        final EventLogger eventLogger)
-    {
-        return new NoneOverflowPolicyHandler();
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java
index 8c221b3..d396985 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java
@@ -18,10 +18,9 @@
  * under the License.
  *
  */
-
 package org.apache.qpid.server.queue;
 
 public interface OverflowPolicyHandler
 {
-    void checkOverflow(final QueueEntry newlyEnqueued);
+    void checkOverflow(QueueEntry newlyEnqueued);
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandlerFactory.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandlerFactory.java b/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandlerFactory.java
deleted file mode 100644
index bb416cf..0000000
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandlerFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.plugin.Pluggable;
-
-public interface OverflowPolicyHandlerFactory extends Pluggable
-{
-    OverflowPolicyHandler create(Queue<?> queue, final EventLogger eventLogger);
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerFactory.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerFactory.java b/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerFactory.java
deleted file mode 100644
index 3219683..0000000
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.model.OverflowPolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.plugin.PluggableService;
-
-@PluggableService
-public class ProducerFlowControlOverflowPolicyHandlerFactory implements OverflowPolicyHandlerFactory
-{
-    @Override
-    public String getType()
-    {
-        return OverflowPolicy.PRODUCER_FLOW_CONTROL.name();
-    }
-
-    @Override
-    public OverflowPolicyHandler create(final Queue<?> queue,
-                                        final EventLogger eventLogger)
-    {
-        return new ProducerFlowControlOverflowPolicyHandler(queue, eventLogger);
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
index 26de745..ddeee1f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
@@ -55,13 +55,14 @@ abstract class QueueEntryTransaction implements QueueManagingVirtualHost.Transac
                 public boolean visit(final QueueEntry entry)
                 {
                     final ServerMessage message = entry.getMessage();
+                    boolean stop = false;
                     if (message != null)
                     {
                         final long messageId = message.getMessageNumber();
                         if ((_messageIds == null || _messageIds.remove(messageId))
                             && (_filter == null || _filter.matches(entry.asFilterable())))
                         {
-                            updateEntry(entry, txn);
+                            stop = updateEntry(entry, txn);
                             _modifiedMessageIds.add(messageId);
                             if (_limit > 0)
                             {
@@ -69,15 +70,14 @@ abstract class QueueEntryTransaction implements QueueManagingVirtualHost.Transac
                             }
                         }
                     }
-                    return _limit == 0 || (_messageIds != null && _messageIds.isEmpty());
+                    return stop || _limit == 0 || (_messageIds != null && _messageIds.isEmpty());
                 }
             });
         }
 
     }
 
-
-    protected abstract void updateEntry(QueueEntry entry, QueueManagingVirtualHost.Transaction txn);
+    protected abstract boolean updateEntry(QueueEntry entry, QueueManagingVirtualHost.Transaction txn);
 
     @Override
     public final List<Long> getModifiedMessageIds()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSizeLimitRespectingTransaction.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSizeLimitRespectingTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSizeLimitRespectingTransaction.java
new file mode 100644
index 0000000..f361466
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSizeLimitRespectingTransaction.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import java.util.List;
+
+import org.apache.qpid.server.filter.MessageFilter;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+
+abstract class QueueSizeLimitRespectingTransaction extends QueueEntryTransaction
+{
+    private final Queue _destinationQueue;
+    private long _pendingQueueDepthBytes;
+    private long _pendingQueueDepthMessages;
+
+    QueueSizeLimitRespectingTransaction(Queue sourceQueue,
+                                        List<Long> messageIds,
+                                        Queue destinationQueue,
+                                        final MessageFilter filter,
+                                        final int limit)
+    {
+        super(sourceQueue, messageIds, filter, limit);
+        _destinationQueue = destinationQueue;
+    }
+
+    @Override
+    protected boolean updateEntry(QueueEntry entry, QueueManagingVirtualHost.Transaction txn)
+    {
+        ServerMessage message = entry.getMessage();
+        _pendingQueueDepthMessages++;
+        _pendingQueueDepthBytes += message == null ? 0 : message.getSizeIncludingHeader();
+        boolean underfull = isUnderfull();
+        if (message != null && !message.isReferenced(_destinationQueue) && underfull)
+        {
+            performOperation(entry, txn, _destinationQueue);
+        }
+
+        return !underfull;
+    }
+
+    abstract void performOperation(final QueueEntry entry,
+                                   final QueueManagingVirtualHost.Transaction txn,
+                                   final Queue destinationQueue);
+
+    private boolean isUnderfull()
+    {
+        return _destinationQueue.getOverflowPolicy() == OverflowPolicy.NONE ||
+               ((_destinationQueue.getMaximumQueueDepthBytes() < 0
+                 || _destinationQueue.getQueueDepthBytes() + _pendingQueueDepthBytes
+                    <= _destinationQueue.getMaximumQueueDepthBytes())
+                && (_destinationQueue.getMaximumQueueDepthMessages() < 0
+                    || _destinationQueue.getQueueDepthMessages() + _pendingQueueDepthMessages
+                       <= _destinationQueue.getMaximumQueueDepthMessages()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/RejectPolicyHandler.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/RejectPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/RejectPolicyHandler.java
new file mode 100644
index 0000000..528133a
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/RejectPolicyHandler.java
@@ -0,0 +1,98 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+
+
+public class RejectPolicyHandler implements MessageStore.MessageDeleteListener
+{
+    private final Queue<?> _queue;
+    private final AtomicLong _pendingDepthBytes = new AtomicLong();
+    private final AtomicInteger _pendingDepthMessages  = new AtomicInteger();
+    private final Map<StoredMessage<?>, Long> _pendingMessages = new ConcurrentHashMap<>();
+
+    RejectPolicyHandler(final Queue<?> queue)
+    {
+        _queue = queue;
+    }
+
+    @Override
+    public void messageDeleted(final StoredMessage<?> m)
+    {
+        decrementPendingCountersIfNecessary(m);
+    }
+
+    void checkReject(final ServerMessage<?> newMessage) throws MessageUnacceptableException
+    {
+        final long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages();
+        final long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
+        final int queueDepthMessages = _queue.getQueueDepthMessages();
+        final long queueDepthBytes = _queue.getQueueDepthBytes();
+
+        int pendingMessages = _pendingDepthMessages.addAndGet(1);
+        long pendingBytes = _pendingDepthBytes.addAndGet(newMessage.getSizeIncludingHeader());
+
+        boolean messagesOverflow = maximumQueueDepthMessages >= 0
+                                   && queueDepthMessages + pendingMessages > maximumQueueDepthMessages;
+        boolean bytesOverflow = maximumQueueDepthBytes >= 0
+                                && queueDepthBytes + pendingBytes > maximumQueueDepthBytes;
+        if (bytesOverflow || messagesOverflow)
+        {
+            final long depthBytesDelta = -newMessage.getSizeIncludingHeader();
+            _pendingDepthBytes.addAndGet(-depthBytesDelta);
+            _pendingDepthMessages.addAndGet(-1);
+            final String message = String.format(
+                    "Maximum depth exceeded on '%s' : current=[count: %d, size: %d], max=[count: %d, size: %d]",
+                    _queue.getName(),
+                    queueDepthMessages + pendingMessages,
+                    queueDepthBytes + pendingBytes,
+                    maximumQueueDepthMessages,
+                    maximumQueueDepthBytes);
+            throw new MessageUnacceptableException(message);
+        }
+
+        _pendingMessages.put(newMessage.getStoredMessage(), newMessage.getSizeIncludingHeader());
+    }
+
+    void postEnqueue(MessageInstance instance)
+    {
+        decrementPendingCountersIfNecessary(instance.getMessage().getStoredMessage());
+    }
+
+    private void decrementPendingCountersIfNecessary(final StoredMessage<?> m)
+    {
+        Long size;
+        if ((size = _pendingMessages.remove(m)) != null)
+        {
+            _pendingDepthBytes.addAndGet(-size);
+            _pendingDepthMessages.addAndGet(-1);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerFactory.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerFactory.java b/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerFactory.java
deleted file mode 100644
index 89a7b6b..0000000
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.model.OverflowPolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.plugin.PluggableService;
-
-@PluggableService
-public class RingOverflowPolicyHandlerFactory implements OverflowPolicyHandlerFactory
-{
-
-    @Override
-    public String getType()
-    {
-        return OverflowPolicy.RING.name();
-    }
-
-    @Override
-    public OverflowPolicyHandler create(final Queue<?> queue,
-                                        final EventLogger eventLogger)
-    {
-        return new RingOverflowPolicyHandler(queue, eventLogger);
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index d9992ec..86342fa 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store;
 
 import java.io.File;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -53,6 +54,8 @@ public class MemoryMessageStore implements MessageStore
     private final Map<UUID, Set<Long>> _messageInstances = new HashMap<UUID, Set<Long>>();
     private final Map<Xid, DistributedTransactionRecords> _distributedTransactions = new HashMap<Xid, DistributedTransactionRecords>();
     private final AtomicLong _inMemorySize = new AtomicLong();
+    private final Set<MessageDeleteListener> _messageDeleteListeners = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
 
 
     private final class MemoryMessageStoreTransaction implements Transaction
@@ -282,6 +285,18 @@ public class MemoryMessageStore implements MessageStore
     }
 
     @Override
+    public void addMessageDeleteListener(final MessageDeleteListener listener)
+    {
+        _messageDeleteListeners.add(listener);
+    }
+
+    @Override
+    public void removeMessageDeleteListener(final MessageDeleteListener listener)
+    {
+        _messageDeleteListeners.remove(listener);
+    }
+
+    @Override
     public <T extends StorableMessageMetaData> MessageHandle<T> addMessage(final T metaData)
     {
         long id = getNextMessageId();
@@ -304,6 +319,13 @@ public class MemoryMessageStore implements MessageStore
                 int bytesCleared = metaData.getStorableSize() + metaData.getContentSize();
                 super.remove();
                 _inMemorySize.addAndGet(-bytesCleared);
+                if (!_messageDeleteListeners.isEmpty())
+                {
+                    for (final MessageDeleteListener messageDeleteListener : _messageDeleteListeners)
+                    {
+                        messageDeleteListener.messageDeleted(this);
+                    }
+                }
             }
         };
         _inMemorySize.addAndGet(metaData.getStorableSize());

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
index b50dbb1..a6bf9cf 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -81,6 +81,10 @@ public interface MessageStore
 
     void onDelete(ConfiguredObject<?> parent);
 
+    void addMessageDeleteListener(MessageDeleteListener listener);
+
+    void removeMessageDeleteListener(MessageDeleteListener listener);
+
     MessageStoreReader newMessageStoreReader();
 
     interface MessageStoreReader
@@ -96,4 +100,9 @@ public interface MessageStore
         void close();
     }
 
+    interface MessageDeleteListener
+    {
+        void messageDeleted(StoredMessage<?> m);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index 37c79e5..e6f6e29 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -198,4 +198,16 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
     {
 
     }
+
+    @Override
+    public void addMessageDeleteListener(final MessageDeleteListener listener)
+    {
+
+    }
+
+    @Override
+    public void removeMessageDeleteListener(final MessageDeleteListener listener)
+    {
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index bdf4f31..163d54a 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -47,11 +47,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.exchange.ExchangeDefaults;
 import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.TestConsumerTarget;
 import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.exchange.DirectExchangeImpl;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
@@ -64,9 +64,9 @@ import org.apache.qpid.server.model.AlternateBinding;
 import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.BrokerTestHelper;
 import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.OverflowPolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.QueueNotificationListener;
-import org.apache.qpid.server.model.OverflowPolicy;
 import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
@@ -952,7 +952,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
 
         message = createMessage(new Long(27), 20, 10);
         result = queue.route(message, message.getInitialRoutingAddress(), null);
-        assertTrue("Result should include not accepting route", result.hasNotAcceptingRoutableQueue());
+        assertTrue("Result should include not accepting route", result.isRejected());
 
         int headerSize = 20;
         int payloadSize = 10;
@@ -965,7 +965,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
 
         message = createMessage(new Long(id), headerSize, payloadSize);
         result = queue.route(message, message.getInitialRoutingAddress(), null);
-        assertTrue("Result should include not accepting route", result.hasNotAcceptingRoutableQueue());
+        assertTrue("Result should include not accepting route", result.isRejected());
     }
 
     public void testAlternateBindingValidationRejectsNonExistingDestination()
@@ -1055,6 +1055,63 @@ abstract class AbstractQueueTestBase extends QpidTestCase
         assertFalse(_queue.isDeleted());
     }
 
+    public void testMoveMessages() throws Exception
+    {
+        doMoveOrCopyMessageTest(true);
+    }
+
+    public void testCopyMessages() throws Exception
+    {
+        doMoveOrCopyMessageTest(false);
+    }
+
+    private void doMoveOrCopyMessageTest(final boolean move)
+    {
+        Queue target = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_target"));
+
+        _queue.enqueue(createMessage(1L), null, null);
+        _queue.enqueue(createMessage(2L), null, null);
+        _queue.enqueue(createMessage(3L), null, null);
+
+        assertEquals("Unexpected number of messages on source queue", 3, _queue.getQueueDepthMessages());
+        assertEquals("Unexpected number of messages on target queue before test", 0, target.getQueueDepthMessages());
+
+        if (move)
+        {
+            _queue.moveMessages(target, null, "true = true", -1);
+        }
+        else
+        {
+            _queue.copyMessages(target, null, "true = true", -1);
+
+        }
+
+        assertEquals("Unexpected number of messages on source queue after test", move ? 0 : 3, _queue.getQueueDepthMessages());
+        assertEquals("Unexpected number of messages on target queue after test", 3, target.getQueueDepthMessages());
+    }
+
+    public void testCopyMessageRespectsQueueSizeLimits() throws Exception
+    {
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(Queue.NAME, getTestName() + "_target");
+        attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.RING);
+        attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 2);
+
+        Queue target = _virtualHost.createChild(Queue.class, attributes);
+
+        _queue.enqueue(createMessage(1L), null, null);
+        _queue.enqueue(createMessage(2L), null, null);
+        _queue.enqueue(createMessage(3L), null, null);
+
+        assertEquals("Unexpected number of messages on source queue", 3, _queue.getQueueDepthMessages());
+        assertEquals("Unexpected number of messages on target queue before test", 0, target.getQueueDepthMessages());
+
+        _queue.copyMessages(target, null, "true = true", -1);
+
+        assertEquals("Unexpected number of messages on source queue after test", 3, _queue.getQueueDepthMessages());
+        assertEquals("Unexpected number of messages on target queue after test", 2, target.getQueueDepthMessages());
+    }
+
     private long getExpirationOnQueue(final Queue<?> queue, long arrivalTime, long expiration)
     {
         final List<QueueEntry> entries = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/test/java/org/apache/qpid/server/queue/RejectPolicyHandlerTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/RejectPolicyHandlerTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/RejectPolicyHandlerTest.java
new file mode 100644
index 0000000..cc821ee
--- /dev/null
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/RejectPolicyHandlerTest.java
@@ -0,0 +1,142 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class RejectPolicyHandlerTest extends QpidTestCase
+{
+    private RejectPolicyHandler _rejectOverflowPolicyHandler;
+    private Queue<?> _queue;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+
+        _queue = mock(Queue.class);
+        when(_queue.getName()).thenReturn("testQueue");
+        when(_queue.getMaximumQueueDepthBytes()).thenReturn(-1L);
+        when(_queue.getMaximumQueueDepthMessages()).thenReturn(-1L);
+        when(_queue.getOverflowPolicy()).thenReturn(OverflowPolicy.REJECT);
+        when(_queue.getQueueDepthMessages()).thenReturn(0);
+
+        _rejectOverflowPolicyHandler = new RejectPolicyHandler(_queue);
+    }
+
+    public void testOverfullBytes() throws Exception
+    {
+        ServerMessage incomingMessage = createIncomingMessage(6);
+        when(_queue.getQueueDepthBytes()).thenReturn(5L);
+        when(_queue.getMaximumQueueDepthBytes()).thenReturn(10L);
+        when(_queue.getQueueDepthMessages()).thenReturn(1);
+
+        try
+        {
+            _rejectOverflowPolicyHandler.checkReject(incomingMessage);
+            fail("Exception expected");
+        }
+        catch (MessageUnacceptableException e)
+        {
+            // pass
+        }
+    }
+
+    public void testOverfullMessages() throws Exception
+    {
+        ServerMessage incomingMessage = createIncomingMessage(5);
+        when(_queue.getMaximumQueueDepthMessages()).thenReturn(7L);
+        when(_queue.getQueueDepthMessages()).thenReturn(7);
+        when(_queue.getQueueDepthBytes()).thenReturn(10L);
+
+        try
+        {
+            _rejectOverflowPolicyHandler.checkReject(incomingMessage);
+            fail("Exception expected");
+        }
+        catch (MessageUnacceptableException e)
+        {
+            // pass
+        }
+    }
+
+    public void testNotOverfullMessages() throws Exception
+    {
+        when(_queue.getMaximumQueueDepthMessages()).thenReturn(1L);
+
+        ServerMessage incomingMessage1 = createIncomingMessage(2);
+        MessageInstance messageInstance1 = mock(MessageInstance.class);
+        when(messageInstance1.getMessage()).thenReturn(incomingMessage1);
+
+        ServerMessage incomingMessage2 = createIncomingMessage(2);
+
+        _rejectOverflowPolicyHandler.checkReject(incomingMessage1);
+        _rejectOverflowPolicyHandler.postEnqueue(messageInstance1);
+
+        _rejectOverflowPolicyHandler.checkReject(incomingMessage2);
+   }
+    public void testNotOverfullBytes() throws Exception
+    {
+        when(_queue.getMaximumQueueDepthBytes()).thenReturn(9L);
+        ServerMessage incomingMessage1 = createIncomingMessage(5);
+        MessageInstance messageInstance1 = mock(MessageInstance.class);
+        when(messageInstance1.getMessage()).thenReturn(incomingMessage1);
+
+        ServerMessage incomingMessage2 = createIncomingMessage(5);
+
+        _rejectOverflowPolicyHandler.checkReject(incomingMessage1);
+        _rejectOverflowPolicyHandler.postEnqueue(messageInstance1);
+
+        _rejectOverflowPolicyHandler.checkReject(incomingMessage2);
+    }
+
+    public void testIncomingMessageDeleted() throws Exception
+    {
+        when(_queue.getMaximumQueueDepthMessages()).thenReturn(1L);
+
+        ServerMessage incomingMessage1 = createIncomingMessage(2);
+
+        ServerMessage incomingMessage2 = createIncomingMessage(2);
+
+        _rejectOverflowPolicyHandler.checkReject(incomingMessage1);
+        _rejectOverflowPolicyHandler.messageDeleted(incomingMessage1.getStoredMessage());
+
+        _rejectOverflowPolicyHandler.checkReject(incomingMessage2);
+    }
+
+    private ServerMessage createIncomingMessage(final long size)
+    {
+        AMQMessageHeader incomingMessageHeader = mock(AMQMessageHeader.class);
+        ServerMessage incomingMessage = mock(ServerMessage.class);
+        when(incomingMessage.getMessageHeader()).thenReturn(incomingMessageHeader);
+        when(incomingMessage.getSizeIncludingHeader()).thenReturn(size);
+        when(incomingMessage.getStoredMessage()).thenReturn(mock(StoredMessage.class));
+        return incomingMessage;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index ee8def1..e70e9ff 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -147,7 +147,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
 
     }
 
-    public void enqueueMessage(final StoredMessage<TestMessageMetaData> message, final String queueName)
+    private void enqueueMessage(final StoredMessage<TestMessageMetaData> message, final String queueName)
     {
         Transaction txn = _store.newTransaction();
         txn.enqueueMessage(new TransactionLogResource()
@@ -417,6 +417,19 @@ public abstract class MessageStoreTestCase extends QpidTestCase
         assertNull(retrievedMessageRef.get());
     }
 
+    public void testMessageDeleted() throws Exception
+    {
+        MessageStore.MessageDeleteListener listener = mock(MessageStore.MessageDeleteListener.class);
+        _store.addMessageDeleteListener(listener);
+
+        long messageId = 1;
+        int contentSize = 0;
+        final MessageHandle<TestMessageMetaData> messageHandle = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+        StoredMessage<TestMessageMetaData> message = messageHandle.allContentAdded();
+        message.remove();
+
+        verify(listener, times(1)).messageDeleted(message);
+    }
 
     private TransactionLogResource createTransactionLogResource(UUID queueId)
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 7da0d0f..989ff4e 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -956,9 +956,9 @@ public class ServerSession extends SessionInvoker
         return isCommandsFull(id);
     }
 
-    public int enqueue(final MessageTransferMessage message,
-                       final InstanceProperties instanceProperties,
-                       final MessageDestination exchange)
+    RoutingResult<MessageTransferMessage> enqueue(final MessageTransferMessage message,
+                final InstanceProperties instanceProperties,
+                final MessageDestination exchange)
     {
         if(_outstandingCredit.get() != UNLIMITED_CREDIT
                 && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
@@ -970,10 +970,10 @@ public class ServerSession extends SessionInvoker
         final long arrivalTime = message.getArrivalTime();
         final RoutingResult<MessageTransferMessage> result =
                 exchange.route(message, message.getInitialRoutingAddress(), instanceProperties);
-        int enqueues = result.send(_transaction, null);
+        result.send(_transaction, null);
         getAMQPConnection().registerMessageReceived(message.getSize(), arrivalTime);
         incrementOutstandingTxnsIfNecessary();
-        return enqueues;
+        return result;
     }
 
     public void sendMessage(MessageTransfer xfr,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 8c3f8c9..ae529a4 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -52,6 +52,8 @@ import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.RejectType;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.AlternateBinding;
 import org.apache.qpid.server.model.Exchange;
@@ -488,32 +490,50 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
                         }
                     };
 
-                    int enqueues = serverSession.enqueue(message, instanceProperties, destination);
+                    RoutingResult<MessageTransferMessage> routingResult = serverSession.enqueue(message, instanceProperties, destination);
 
-                    if (enqueues == 0)
+                    boolean explictlyRejected = routingResult.containsReject(RejectType.LIMIT_EXCEEDED);
+                    if (!routingResult.hasRoutes() || explictlyRejected)
                     {
-                        if ((delvProps == null || !delvProps.getDiscardUnroutable())
-                            && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+                        boolean closeWhenNoRoute = serverSession.getAMQPConnection().getBroker().getConnection_closeWhenNoRoute();
+                        boolean discardUnroutable = delvProps != null && delvProps.getDiscardUnroutable();
+                        if (!discardUnroutable && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
                         {
                             RangeSet rejects = RangeSetFactory.createRangeSet();
                             rejects.add(xfr.getId());
                             MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
                             ssn.invoke(reject);
                         }
+                        else if (!discardUnroutable && closeWhenNoRoute && explictlyRejected)
+                        {
+                            ExecutionErrorCode code = ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED;
+                            String errorMessage = String.format("No route for message with destination '%s' and routing key '%s' : %s",
+                                                                xfr.getDestination(),
+                                                                message.getInitialRoutingAddress(),
+                                                                routingResult.getRejectReason());
+
+                            ExecutionException ex = new ExecutionException();
+                            ex.setErrorCode(code);
+                            ex.setDescription(errorMessage);
+                            serverSession.invoke(ex);
+                            serverSession.close(ErrorCodes.RESOURCE_ERROR, errorMessage);
+                            return;
+                        }
                         else
                         {
                             getEventLogger(ssn).message(ExchangeMessages.DISCARDMSG(destination.getName(),
-                                                                                             messageMetaData.getRoutingKey()));
+                                                                                    messageMetaData.getRoutingKey()));
                         }
                     }
 
+                    // TODO: we currently do not send MessageAccept when AcceptMode is EXPLICIT
                     if (serverSession.isTransactional())
                     {
                         serverSession.processed(xfr);
                     }
                     else
                     {
-                        serverSession.recordFuture(Futures.<Void>immediateFuture(null),
+                        serverSession.recordFuture(Futures.immediateFuture(null),
                                                    new CommandProcessedAction(serverSession, xfr));
                     }
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org