You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/02/23 17:10:29 UTC

svn commit: r1784177 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/test/java/org/apache/q...

Author: orudyy
Date: Thu Feb 23 17:10:29 2017
New Revision: 1784177

URL: http://svn.apache.org/viewvc?rev=1784177&view=rev
Log:
QPID-7618: [Java Broker] Add ring policy support

Work done by Tomas Vavricka <va...@gmail.com>

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java
    qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/addQueue.html
    qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
    qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
    qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/showQueue.html
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java
    qpid/java/trunk/test-profiles/Java010Excludes
    qpid/java/trunk/test-profiles/JavaExcludes
    qpid/java/trunk/test-profiles/python_tests/Java010PythonExcludes

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java Thu Feb 23 17:10:29 2017
@@ -62,6 +62,12 @@ public abstract class AbstractServerMess
         return _handle.getContentSize();
     }
 
+    @Override
+    public long getSizeIncludingHeader()
+    {
+        return _handle.getMetaData().getContentSize() + _handle.getMetaData().getStorableSize();
+    }
+
     public StoredMessage<T> getStoredMessage()
     {
         return _handle;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java Thu Feb 23 17:10:29 2017
@@ -42,6 +42,14 @@ public class RoutingResult<M extends Ser
 
     private final Set<BaseQueue> _queues = new HashSet<>();
 
+    private int _errorCodeAmqp_0_10;
+
+    private String _errorCodeAmqp_1_0;
+
+    private String _errorMessage;
+
+    private boolean _routingFailure = false;
+
     public RoutingResult(final M message)
     {
         _message = message;
@@ -82,7 +90,17 @@ public class RoutingResult<M extends Ser
 
     public void add(RoutingResult<M> result)
     {
-        addQueues(result._queues);
+        if (result.isRoutingFailure())
+        {
+            _routingFailure = result._routingFailure;
+            _errorCodeAmqp_0_10 = result._errorCodeAmqp_0_10;
+            _errorCodeAmqp_1_0 = result._errorCodeAmqp_1_0;
+            _errorMessage = result._errorMessage;
+        }
+        else
+        {
+            addQueues(result._queues);
+        }
     }
 
     public int send(ServerTransaction txn,
@@ -144,4 +162,32 @@ public class RoutingResult<M extends Ser
     {
         return !_queues.isEmpty();
     }
+
+    public void addRoutingFailure(int errorCodeAmqp_0_10, String errorCodeAmqp_1_0, String reason)
+    {
+        _routingFailure = true;
+        this._errorCodeAmqp_0_10 = errorCodeAmqp_0_10;
+        this._errorCodeAmqp_1_0 = errorCodeAmqp_1_0;
+        _errorMessage = reason;
+    }
+
+    public String getErrorMessage()
+    {
+        return _errorMessage;
+    }
+
+    public int getErrorCodeAmqp_0_10()
+    {
+        return _errorCodeAmqp_0_10;
+    }
+
+    public String getErrorCodeAmqp_1_0()
+    {
+        return _errorCodeAmqp_1_0;
+    }
+
+    public boolean isRoutingFailure()
+    {
+        return _routingFailure;
+    }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java Thu Feb 23 17:10:29 2017
@@ -38,6 +38,8 @@ public interface ServerMessage<T extends
 
     long getSize();
 
+    long getSizeIncludingHeader();
+
     long getExpiration();
 
     MessageReference newReference();

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java?rev=1784177&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java Thu Feb 23 17:10:29 2017
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in complianceS
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+public enum OverflowPolicy
+{
+    NONE,
+    RING
+}

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Thu Feb 23 17:10:29 2017
@@ -82,7 +82,9 @@ public interface Queue<X extends Queue<X
     String DEFAULT_FILTERS = "defaultFilters";
     String ENSURE_NONDESTRUCTIVE_CONSUMERS = "ensureNondestructiveConsumers";
     String HOLD_ON_PUBLISH_ENABLED = "holdOnPublishEnabled";
-
+    String OVERFLOW_POLICY = "overflowPolicy";
+    String MAX_COUNT = "maxCount";
+    String MAX_SIZE = "maxSize";
 
     String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint";
     @SuppressWarnings("unused")
@@ -259,6 +261,27 @@ public interface Queue<X extends Queue<X
                                      + "visible may depend on how frequently the virtual host housekeeping thread runs.")
     boolean isHoldOnPublishEnabled();
 
+    @ManagedContextDefault( name = "queue.defaultMaxCount", description = "Maximum count of messages in queue when policy_type set to Ring")
+    long DEFAULT_MAX_COUNT = 10;
+
+    @ManagedAttribute( defaultValue = "${queue.defaultMaxCount}", description = "Maximum count of messages in queue, when policy_type set to Ring")
+    long getMaxCount();
+
+    @ManagedContextDefault( name = "queue.defaultMaxSize", description = "Maximum size of messages in queue in bytes, when policy_type set to Ring")
+    long DEFAULT_MAX_SIZE = 1024;
+
+    @ManagedAttribute( defaultValue = "${queue.defaultMaxSize}", description = "Maximum size of messages in queue in bytes, when policy_type set to Ring")
+    long getMaxSize();
+
+    @SuppressWarnings("unused")
+    @ManagedContextDefault( name = "queue.defaultOverflowPolicy")
+    OverflowPolicy DEFAULT_POLICY_TYPE = OverflowPolicy.NONE;
+
+    @ManagedAttribute( defaultValue = "${queue.defaultOverflowPolicy}", description = "Queue overflow policy. Current options are Ring and None." +
+            " Ring overflow policy - when queue message count or size of messages in queue exceeds maximum, oldest message(s) is discarded." +
+            " None overflow policy - maximum size and maximum count properties are not applied.")
+    OverflowPolicy getOverflowPolicy();
+
     @ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
     Collection<PublishingLink> getPublishingLinks();
 
@@ -274,6 +297,7 @@ public interface Queue<X extends Queue<X
 
     int deleteAndReturnCount();
 
+    void deleteEntry(QueueEntry node);
 
     void setNotificationListener(QueueNotificationListener listener);
 
@@ -309,6 +333,9 @@ public interface Queue<X extends Queue<X
     @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Queue Depth")
     long getQueueDepthBytes();
 
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Queue Depth Including Header")
+    long getQueueDepthBytesIncludingHeader();
+
     @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Queue Depth")
     int getQueueDepthMessages();
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Feb 23 17:10:29 2017
@@ -191,6 +191,9 @@ public abstract class AbstractQueue<X ex
     private ExclusivityPolicy _exclusive;
 
     @ManagedAttributeField
+    private OverflowPolicy _overflowPolicy;
+
+    @ManagedAttributeField
     private MessageDurability _messageDurability;
 
     @ManagedAttributeField
@@ -256,7 +259,10 @@ public abstract class AbstractQueue<X ex
     private boolean _ensureNondestructiveConsumers;
     @ManagedAttributeField
     private volatile boolean _holdOnPublishEnabled;
-
+    @ManagedAttributeField
+    private long _maxCount;
+    @ManagedAttributeField
+    private long _maxSize;
 
     private static final int RECOVERING = 1;
     private static final int COMPLETING_RECOVERY = 2;
@@ -674,6 +680,18 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
+    public long getMaxCount()
+    {
+        return _maxCount;
+    }
+
+    @Override
+    public long getMaxSize()
+    {
+        return _maxSize;
+    }
+
+    @Override
     public Collection<String> getAvailableAttributes()
     {
         return new ArrayList<String>(_arguments.keySet());
@@ -1140,6 +1158,7 @@ public abstract class AbstractQueue<X ex
 
     protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
     {
+        applyOverflowPolicy(message);
         final QueueEntry entry = getEntries().add(message, enqueueRecord);
         updateExpiration(entry);
 
@@ -1163,6 +1182,31 @@ public abstract class AbstractQueue<X ex
 
     }
 
+    private void applyOverflowPolicy(final ServerMessage message)
+    {
+        if (getOverflowPolicy() != null)
+        {
+            switch (getOverflowPolicy())
+            {
+                case RING:
+                    while (getQueueDepthMessages() == getMaxCount())
+                    {
+                        QueueEntry entry = getEntries().getOldestEntry();
+                        deleteEntry(entry);
+                    }
+                    while (getQueueDepthBytesIncludingHeader() + message.getSizeIncludingHeader() > getMaxSize())
+                    {
+                        QueueEntry entry = getEntries().getOldestEntry();
+                        deleteEntry(entry);
+                    }
+                    break;
+                case NONE:
+                default:
+                    break;
+            }
+        }
+    }
+
     private void updateExpiration(final QueueEntry entry)
     {
         long expiration = entry.getMessage().getExpiration();
@@ -1336,6 +1380,12 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
+    public long getQueueDepthBytesIncludingHeader()
+    {
+        return _queueStatistics.getQueueSizeIncludingHeader();
+    }
+
+    @Override
     public long getAvailableBytes()
     {
         return _queueStatistics.getAvailableSize();
@@ -1621,6 +1671,25 @@ public abstract class AbstractQueue<X ex
                     });
     }
 
+    public void deleteEntry(final QueueEntry entry)
+    {
+        boolean acquiredForDequeueing = entry.acquireOrSteal(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                dequeueEntry(entry);
+            }
+        });
+
+        if(acquiredForDequeueing)
+        {
+            _logger.debug("Dequeuing expired node {}", entry);
+            // Then dequeue it.
+            dequeueEntry(entry);
+        }
+    }
+
     @Override
     public void addDeleteTask(final Action<? super X> task)
     {
@@ -2086,21 +2155,7 @@ public abstract class AbstractQueue<X ex
                 // If the node has expired then acquire it
                 if (node.expired())
                 {
-                    boolean acquiredForDequeueing = node.acquireOrSteal(new Runnable()
-                    {
-                        @Override
-                        public void run()
-                        {
-                            dequeueEntry(node);
-                        }
-                    });
-
-                    if(acquiredForDequeueing)
-                    {
-                        _logger.debug("Dequeuing expired node {}", node);
-                        // Then dequeue it.
-                        dequeueEntry(node);
-                    }
+                    deleteEntry(node);
                 }
                 else
                 {
@@ -2581,7 +2636,32 @@ public abstract class AbstractQueue<X ex
             throw new VirtualHostUnavailableException(this._virtualHost);
         }
         RoutingResult<M> result = new RoutingResult<>(message);
-        if(message.isResourceAcceptable(this) && !message.isReferenced(this))
+
+        if(_overflowPolicy == OverflowPolicy.RING && message.getSizeIncludingHeader() > getMaxSize())
+        {
+            result.addRoutingFailure(506, "amqp:resource-limit-exceeded", "Message larger than configured maximum depth on " + this.getName()
+                    + ": size=" + message.getSizeIncludingHeader() + ", max-size=" + getMaxSize());
+        }
+        else if(_overflowPolicy == OverflowPolicy.RING && _maxCount == 0)
+        {
+            result.addRoutingFailure(506, "amqp:resource-limit-exceeded", "Queue '" + getName() + "' maximum count (0) prevents this message from being accepted");
+        }
+        else if (this instanceof PriorityQueue && message.isResourceAcceptable(this) && !message.isReferenced(this))
+        {
+            if (getMaxCount() == getQueueDepthMessages())
+            {
+                QueueEntry entry = getEntries().getOldestEntry();
+                if(entry.getMessage().getMessageHeader().getPriority() <= message.getMessageHeader().getPriority())
+                {
+                    result.addQueue(this);
+                }
+            }
+            else
+            {
+                result.addQueue(this);
+            }
+        }
+        else if(message.isResourceAcceptable(this) && !message.isReferenced(this))
         {
             result.addQueue(this);
         }
@@ -2874,6 +2954,12 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
+    public OverflowPolicy getOverflowPolicy()
+    {
+        return _overflowPolicy;
+    }
+
+    @Override
     public boolean isNoLocal()
     {
         return _noLocal;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java Thu Feb 23 17:10:29 2017
@@ -48,6 +48,7 @@ abstract class AbstractQueueEntryList im
         final long size = entry.getSize();
         final QueueStatistics queueStatistics = _queueStatistics;
         queueStatistics.addToAvailable(size);
+        queueStatistics.addToDepthIncludingHeader(entry.getSizeWithHeader());
         queueStatistics.addToQueue(size);
         queueStatistics.addToEnqueued(size);
         if(_forcePersistent || (_respectPersistent && entry.getMessage().isPersistent()))
@@ -89,6 +90,7 @@ abstract class AbstractQueueEntryList im
                 break;
             case DELETED:
                 queueStatistics.removeFromQueue(size);
+                queueStatistics.removeFromDepthIncludingHeader(entry.getSizeWithHeader());
                 queueStatistics.addToDequeued(size);
                 if(_forcePersistent || (_respectPersistent && entry.isPersistent()))
                 {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Thu Feb 23 17:10:29 2017
@@ -207,9 +207,9 @@ abstract public class PriorityQueueList
             for(PriorityQueueEntrySubList subList : _priorityLists)
             {
                 QueueEntry subListOldest = subList.getOldestEntry();
-                if(oldest == null || (subListOldest != null && subListOldest.getMessage().getMessageNumber() < oldest.getMessage().getMessageNumber()))
+                if (subListOldest != null)
                 {
-                    oldest = subListOldest;
+                    return subListOldest;
                 }
             }
             return oldest;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java Thu Feb 23 17:10:29 2017
@@ -25,6 +25,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.OverflowPolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
 
@@ -65,6 +66,11 @@ public class QueueArgumentsConverter
 
     public static final String QPID_EXCLUSIVITY_POLICY = "qpid.exclusivity_policy";
     public static final String QPID_LIFETIME_POLICY = "qpid.lifetime_policy";
+
+    public static final String QPID_POLICY_TYPE = "qpid.policy_type";
+    public static final String QPID_MAX_COUNT = "qpid.max_count";
+    public static final String QPID_MAX_SIZE = "qpid.max_size";
+
     /**
      * No-local queue argument is used to support the no-local feature of Durable Subscribers.
      */
@@ -109,6 +115,9 @@ public class QueueArgumentsConverter
         ATTRIBUTE_MAPPINGS.put(QPID_EXCLUSIVITY_POLICY, Queue.EXCLUSIVE);
         ATTRIBUTE_MAPPINGS.put(QPID_LIFETIME_POLICY, Queue.LIFETIME_POLICY);
 
+        ATTRIBUTE_MAPPINGS.put(QPID_POLICY_TYPE, Queue.OVERFLOW_POLICY);
+        ATTRIBUTE_MAPPINGS.put(QPID_MAX_COUNT, Queue.MAX_COUNT);
+        ATTRIBUTE_MAPPINGS.put(QPID_MAX_SIZE, Queue.MAX_SIZE);
     }
 
 
@@ -128,6 +137,10 @@ public class QueueArgumentsConverter
             {
                 modelArguments.put(LastValueQueue.LVQ_KEY, LastValueQueue.DEFAULT_LVQ_KEY);
             }
+            if(wireArguments.containsKey(QPID_POLICY_TYPE))
+            {
+                modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.valueOf(String.valueOf(wireArguments.get(QPID_POLICY_TYPE)).toUpperCase()));
+            }
             if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP))
             {
                 modelArguments.put(Queue.MESSAGE_GROUP_SHARED_GROUPS,

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Thu Feb 23 17:10:29 2017
@@ -31,6 +31,8 @@ public interface QueueEntry extends Mess
 
     long getSize();
 
+    long getSizeWithHeader();
+
     boolean acquireOrSteal(final Runnable delayedAcquisitionTask);
 
     boolean isQueueDeleted();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Thu Feb 23 17:10:29 2017
@@ -195,6 +195,11 @@ public abstract class QueueEntryImpl imp
         return getMessage() == null ? 0 : getMessage().getSize();
     }
 
+    public long getSizeWithHeader()
+    {
+        return getMessage() == null ? 0 : getMessage().getSizeIncludingHeader();
+    }
+
     public boolean getDeliveredToConsumer()
     {
         return _deliveryCountUpdater.get(this) != -1;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java Thu Feb 23 17:10:29 2017
@@ -27,6 +27,7 @@ final class QueueStatistics
 {
     private final AtomicInteger _queueCount = new AtomicInteger();
     private final AtomicLong _queueSize = new AtomicLong();
+    private final AtomicLong _queueSizeIncludingHeader = new AtomicLong();
 
     private final AtomicInteger _unackedCount = new AtomicInteger();
     private final AtomicLong _unackedSize = new AtomicLong();
@@ -62,6 +63,11 @@ final class QueueStatistics
         return _queueSize.get();
     }
 
+    public long getQueueSizeIncludingHeader()
+    {
+        return _queueSizeIncludingHeader.get();
+    }
+
     public final int getUnackedCount()
     {
         return _unackedCount.get();
@@ -164,6 +170,16 @@ final class QueueStatistics
         _queueSize.addAndGet(-size);
     }
 
+    public void addToDepthIncludingHeader(long size)
+    {
+        _queueSizeIncludingHeader.addAndGet(size);
+    }
+
+    public void removeFromDepthIncludingHeader(long size)
+    {
+        _queueSizeIncludingHeader.addAndGet(-size);
+    }
+
     void addToAvailable(long size)
     {
         int count = _availableCount.incrementAndGet();

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Thu Feb 23 17:10:29 2017
@@ -57,12 +57,14 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.BrokerTestHelper;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.QueueNotificationListener;
+import org.apache.qpid.server.model.OverflowPolicy;
 import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.util.Action;
@@ -840,6 +842,153 @@ abstract class AbstractQueueTestBase ext
         assertEquals(10l,queue.getOldestMessageArrivalTime());
     }
 
+    public void testNoneOverflowPolicy()
+    {
+        Map<String,Object> attributes = new HashMap<>(_arguments);
+        attributes.put(Queue.MAX_COUNT, 2);
+        attributes.put(Queue.MAX_SIZE, 100);
+
+        Queue<?> queue = getQueue();
+        queue.setAttributes(attributes);
+
+        ServerMessage message = createMessage(new Long(24), 50, 50);
+        when(message.getArrivalTime()).thenReturn(10l);
+        queue.enqueue(message, null, null);
+        message = createMessage(new Long(25), 50, 50);
+        when(message.getArrivalTime()).thenReturn(50l);
+        queue.enqueue(message, null, null);
+        message = createMessage(new Long(26), 50, 50);
+        when(message.getArrivalTime()).thenReturn(200l);
+        queue.enqueue(message, null, null);
+
+        assertEquals("Wrong number of messages in queue",3, queue.getQueueDepthMessages());
+        assertEquals("Wrong size of messages in queue",300, queue.getQueueDepthBytesIncludingHeader());
+        assertEquals("Wrong oldest message", 10l,
+                ((AbstractQueue) queue).getEntries().getOldestEntry().getMessage().getArrivalTime());
+        queue.clearQueue();
+
+        attributes = new HashMap<>(_arguments);
+        attributes.put(Queue.MAX_COUNT, Queue.DEFAULT_MAX_COUNT);
+        attributes.put(Queue.MAX_SIZE, Queue.DEFAULT_MAX_SIZE);
+        queue.setAttributes(attributes);
+    }
+
+    public void testRingOverflowPolicyMaxCount()
+    {
+        Map<String,Object> attributes = new HashMap<>(_arguments);
+        attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.RING);
+        attributes.put(Queue.MAX_COUNT, 4);
+
+        Queue<?> queue = getQueue();
+        queue.setAttributes(attributes);
+
+        ServerMessage message = createMessage(new Long(24), 10, 10);
+        when(message.getArrivalTime()).thenReturn(10l);
+        queue.enqueue(message, null, null);
+        message = createMessage(new Long(25), 10, 10);
+        when(message.getArrivalTime()).thenReturn(50l);
+        queue.enqueue(message, null, null);
+        message = createMessage(new Long(26), 10, 10);
+        when(message.getArrivalTime()).thenReturn(200l);
+        queue.enqueue(message, null, null);
+        message = createMessage(new Long(27), 10, 10);
+        when(message.getArrivalTime()).thenReturn(500l);
+        queue.enqueue(message, null, null);
+        message = createMessage(new Long(28), 10, 10);
+        when(message.getArrivalTime()).thenReturn(1000l);
+        queue.enqueue(message, null, null);
+
+        assertEquals("Wrong number of messages in queue",4, queue.getQueueDepthMessages());
+        assertEquals("Wrong size of messages in queue",80, queue.getQueueDepthBytesIncludingHeader());
+        assertEquals("Wrong oldest message", 50l,
+                ((AbstractQueue) queue).getEntries().getOldestEntry().getMessage().getArrivalTime());
+        queue.clearQueue();
+
+        attributes = new HashMap<>(_arguments);
+        attributes.put(Queue.OVERFLOW_POLICY, Queue.DEFAULT_POLICY_TYPE);
+        attributes.put(Queue.MAX_COUNT, Queue.DEFAULT_MAX_COUNT);
+        queue.setAttributes(attributes);
+    }
+
+    public void testRingOverflowPolicyMaxSize()
+    {
+        Map<String,Object> attributes = new HashMap<>(_arguments);
+        attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.RING);
+        attributes.put(Queue.MAX_COUNT, 4);
+        attributes.put(Queue.MAX_SIZE, 100);
+
+        Queue<?> queue = getQueue();
+        queue.setAttributes(attributes);
+
+        ServerMessage message = createMessage(new Long(24), 10, 10);
+        when(message.getArrivalTime()).thenReturn(10l);
+        queue.enqueue(message, null, null);
+        message = createMessage(new Long(25), 10, 10);
+        when(message.getArrivalTime()).thenReturn(50l);
+        queue.enqueue(message, null, null);
+        message = createMessage(new Long(26), 20, 10);
+        when(message.getArrivalTime()).thenReturn(200l);
+        queue.enqueue(message, null, null);
+        message = createMessage(new Long(27), 20, 10);
+        when(message.getArrivalTime()).thenReturn(200l);
+        queue.enqueue(message, null, null);
+
+        assertEquals("Wrong number of messages in queue",4, queue.getQueueDepthMessages());
+        assertEquals("Wrong size of messages in queue",100, queue.getQueueDepthBytesIncludingHeader());
+
+        message = createMessage(new Long(27), 20, 10);
+        when(message.getArrivalTime()).thenReturn(500l);
+        queue.enqueue(message, null, null);
+
+        assertEquals("Wrong number of messages in queue",3, queue.getQueueDepthMessages());
+        assertEquals("Wrong size of messages in queue",90, queue.getQueueDepthBytesIncludingHeader());
+        assertEquals("Wrong oldest message", 200l,
+                ((AbstractQueue) queue).getEntries().getOldestEntry().getMessage().getArrivalTime());
+        queue.clearQueue();
+
+        attributes = new HashMap<>(_arguments);
+        attributes.put(Queue.OVERFLOW_POLICY, Queue.DEFAULT_POLICY_TYPE);
+        attributes.put(Queue.MAX_COUNT, Queue.DEFAULT_MAX_COUNT);
+        attributes.put(Queue.MAX_SIZE, Queue.DEFAULT_MAX_SIZE);
+        queue.setAttributes(attributes);
+    }
+
+    public void testRingOverflowPolicyMessagesRejected()
+    {
+        Map<String,Object> attributes = new HashMap<>(_arguments);
+        attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.RING);
+        attributes.put(Queue.MAX_COUNT, 0);
+
+        Queue<?> queue = getQueue();
+        queue.setAttributes(attributes);
+
+        ServerMessage message;
+        RoutingResult result;
+
+        message = createMessage(new Long(27), 20, 10);
+        result = queue.route(message, message.getInitialRoutingAddress(), null);
+        assertTrue("Result should include routing failure", result.isRoutingFailure());
+
+        int headerSize = 20;
+        int payloadSize = 10;
+        int id = 28;
+
+        attributes = new HashMap<>(_arguments);
+        attributes.put(Queue.MAX_COUNT, 10);
+        attributes.put(Queue.MAX_SIZE, 10);
+        queue.setAttributes(attributes);
+
+        message = createMessage(new Long(id), headerSize, payloadSize);
+        result = queue.route(message, message.getInitialRoutingAddress(), null);
+        assertTrue("Result should include routing failure", result.isRoutingFailure());
+
+        attributes = new HashMap<>(_arguments);
+        attributes.put(Queue.OVERFLOW_POLICY, Queue.DEFAULT_POLICY_TYPE);
+        attributes.put(Queue.MAX_COUNT, Queue.DEFAULT_MAX_COUNT);
+        attributes.put(Queue.MAX_SIZE, Queue.DEFAULT_MAX_SIZE);
+        queue.setAttributes(attributes);
+    }
+
     private long getExpirationOnQueue(final Queue<?> queue, long arrivalTime, long expiration)
     {
         final List<QueueEntry> entries = new ArrayList<>();
@@ -1017,6 +1166,13 @@ abstract class AbstractQueueTestBase ext
 
     }
 
+    protected ServerMessage createMessage(Long id, final int headerSize, final int payloadSize)
+    {
+        ServerMessage message = createMessage(id);
+        when(message.getSizeIncludingHeader()).thenReturn(new Long(headerSize + payloadSize));
+        return message;
+    }
+
     protected ServerMessage createMessage(Long id)
     {
         AMQMessageHeader header = mock(AMQMessageHeader.class);

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java Thu Feb 23 17:10:29 2017
@@ -154,6 +154,12 @@ public class TestMessageMetaDataType imp
         }
 
         @Override
+        public long getSizeIncludingHeader()
+        {
+            return 0;
+        }
+
+        @Override
         public StoredMessage<TestMessageMetaData> getStoredMessage()
         {
             return _storedMsg;

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Thu Feb 23 17:10:29 2017
@@ -85,6 +85,12 @@ class MockServerMessage implements Serve
     }
 
     @Override
+    public long getSizeIncludingHeader()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public String getInitialRoutingAddress()
     {
         throw new UnsupportedOperationException();

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Thu Feb 23 17:10:29 2017
@@ -1007,11 +1007,24 @@ public class ServerSession extends Sessi
         }
         final RoutingResult<MessageTransferMessage> result =
                 exchange.route(message, message.getInitialRoutingAddress(), instanceProperties);
-        int enqueues = result.send(_transaction, _checkCapacityAction);
-        getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime());
-        incrementOutstandingTxnsIfNecessary();
-        incrementUncommittedMessageSize(message.getStoredMessage());
-        return enqueues;
+        if (result.isRoutingFailure())
+        {
+            org.apache.qpid.server.transport.ExecutionException ex = new org.apache.qpid.server.transport.ExecutionException();
+            ex.setErrorCode(ExecutionErrorCode.get(result.getErrorCodeAmqp_0_10()));
+            ex.setCommandId((int) message.getMessageNumber());
+            ex.setDescription(result.getErrorMessage());
+            invoke(ex);
+            close(ExecutionErrorCode.get(result.getErrorCodeAmqp_0_10()).getValue(), result.getErrorMessage());
+            return 0;
+        }
+        else
+        {
+            int enqueues = result.send(_transaction, _checkCapacityAction);
+            getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime());
+            incrementOutstandingTxnsIfNecessary();
+            incrementUncommittedMessageSize(message.getStoredMessage());
+            return enqueues;
+        }
     }
 
     private void resetUncommittedMessages()

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java Thu Feb 23 17:10:29 2017
@@ -106,20 +106,30 @@ public class NodeReceivingDestination im
                 }};
 
         RoutingResult result = _destination.route(message, routingAddress, instanceProperties);
-        int enqueues = result.send(txn, action);
-
-        if(enqueues == 0)
+        if (result.isRoutingFailure())
         {
-            _eventLogger.message(ExchangeMessages.DISCARDMSG(_destination.getName(), routingAddress));
+            return createdRejectedOutcome(AmqpError.valueOf(result.getErrorCodeAmqp_1_0()),
+                    result.getErrorMessage());
         }
+        else
+        {
+            int enqueues = result.send(txn, action);
 
-        return enqueues == 0 && !_discardUnroutable ? createdRejectedOutcome(routingAddress) : ACCEPTED;
+            if(enqueues == 0)
+            {
+                _eventLogger.message(ExchangeMessages.DISCARDMSG(_destination.getName(), routingAddress));
+            }
+
+            return enqueues == 0 && !_discardUnroutable ?
+                    createdRejectedOutcome(AmqpError.NOT_FOUND,
+                            "Unknown destination '" + routingAddress + '"') : ACCEPTED;
+        }
     }
 
-    private Outcome createdRejectedOutcome(String routingAddress)
+    private Outcome createdRejectedOutcome(AmqpError errorCode, String errorMessage)
     {
         Rejected rejected = new Rejected();
-        final Error notFoundError = new Error(AmqpError.NOT_FOUND, "Unknown destination '" + routingAddress + '"');
+        final Error notFoundError = new Error(errorCode, errorMessage);
         rejected.setError(notFoundError);
         return rejected;
     }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java Thu Feb 23 17:10:29 2017
@@ -149,6 +149,11 @@ public class AmqpError
         }
     }
 
+    public static AmqpError valueOf(String errorCode)
+    {
+        return valueOf(Symbol.valueOf(errorCode));
+    }
+
     public static AmqpError valueOf(Object obj)
     {
         Symbol val = (Symbol) obj;

Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/addQueue.html
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/addQueue.html?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/addQueue.html (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/addQueue.html Thu Feb 23 17:10:29 2017
@@ -138,6 +138,52 @@
                 </div>
             </div>
             <div class="clear">
+                <div class="formLabel-labelCell">Overflow policy:</div>
+                <div class="formLabel-controlCell">
+                    <select id="formAddQueue.overflowPolicy"
+                            dojoType="dijit.form.FilteringSelect"
+                            data-dojo-props="
+                              name: 'overflowPolicy',
+                              value: '',
+                              searchAttr: 'name',
+                              required: false,
+                              promptMessage: 'Overflow policy override. If not default, messages arriving will have overflow policy setting overridden',
+                              title: 'Enter overflow policy override'">
+                        <option value="NONE">None</option>
+                        <option value="RING">Ring</option>
+                    </select>
+                </div>
+            </div>
+
+            <div id="formAddQueueOverflowPolicy:RING" class="hidden overflowPolicySpecificDiv">
+                <div class="clear">
+                    <div class="formLabel-labelCell">Maximum count:</div>
+                    <div class="formLabel-controlCell">
+                        <input type="text" id="formAddQueueOverflowPolicy.maxCount"
+                               data-dojo-type="dijit/form/ValidationTextBox"
+                               data-dojo-props="
+                                  name: 'maxCount',
+                                  placeHolder: 'number of messages',
+                                  promptMessage: 'Maximum number of messages in the queue',
+                                  title: 'Enter the maximum number of messages in the queue',
+                                  trim: true"/>
+                    </div>
+                    <div class="formLabel-labelCell">Maximum size:</div>
+                    <div class="formLabel-controlCell">
+                        <input type="text" id="formAddQueueoverflowPolicy.maxSize"
+                               data-dojo-type="dijit/form/ValidationTextBox"
+                               data-dojo-props="
+                                  name: 'maxSize',
+                                  placeHolder: 'size of messages',
+                                  promptMessage: 'Maximum size of messages (including header) in the queue',
+                                  title: 'Enter the maximum size of messages (including header) in the queue',
+                                  trim: true"/>
+                    </div>
+                </div>
+                <div class="clear"></div>
+            </div>
+
+            <div class="clear">
                 <div class="formLabel-labelCell">Maximum Ttl:</div>
                 <div class="formLabel-controlCell">
                     <input type="text" id="formAddQueue.maximumMessageTtl"

Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js Thu Feb 23 17:10:29 2017
@@ -386,6 +386,8 @@ define(["dojo/_base/declare",
                         "exclusive",
                         "owner",
                         "lifetimePolicy",
+                        "overflowPolicy",
+                        "overflowPolicyQualifier",
                         "type",
                         "typeQualifier",
                         "alertRepeatGap",
@@ -404,6 +406,9 @@ define(["dojo/_base/declare",
                         "queueDepthMessages",
                         "queueDepthBytes",
                         "queueDepthBytesUnits",
+                        "queueDepthMessagesIncludingHeader",
+                        "queueDepthBytesIncludingHeader",
+                        "queueDepthBytesUnitsIncludingHeader",
                         "unacknowledgedMessages",
                         "unacknowledgedBytes",
                         "unacknowledgedBytesUnits",
@@ -476,6 +481,11 @@ define(["dojo/_base/declare",
             this.queueDepthBytes.innerHTML = "(" + bytesDepth.value;
             this.queueDepthBytesUnits.innerHTML = bytesDepth.units + ")";
 
+            this.queueDepthMessagesIncludingHeader.innerHTML = entities.encode(String(this.queueData["queueDepthMessages"]));
+            bytesDepth = formatter.formatBytes(this.queueData["queueDepthBytesIncludingHeader"]);
+            this.queueDepthBytesIncludingHeader.innerHTML = "(" + bytesDepth.value;
+            this.queueDepthBytesUnitsIncludingHeader.innerHTML = bytesDepth.units + ")";
+
             this.unacknowledgedMessages.innerHTML = entities.encode(String(this.queueData["unacknowledgedMessages"]));
             bytesDepth = formatter.formatBytes(this.queueData["unacknowledgedBytes"]);
             this.unacknowledgedBytes.innerHTML = "(" + bytesDepth.value;
@@ -492,6 +502,18 @@ define(["dojo/_base/declare",
                                                                + ")");
             }
 
+            this["overflowPolicy"].innerHTML = entities.encode(this.queueData["overflowPolicy"]);
+            if (this.queueData["overflowPolicy"] == "NONE")
+            {
+                this.overflowPolicyQualifier.style.display = "none";
+            }
+            else if (this.queueData["overflowPolicy"] == "RING")
+            {
+                bytesDepth = formatter.formatBytes(this.queueData["maxSize"]);
+                this.overflowPolicyQualifier.innerHTML = "(Max count: " + entities.encode(String(this.queueData["maxCount"]))
+                                                    + ", Max Size: " + bytesDepth.value + " " + bytesDepth.units + ")";
+            }
+
             if (this.queueData["messageGroupKey"])
             {
                 this.messageGroupKey.innerHTML = entities.encode(String(this.queueData["messageGroupKey"]));

Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js Thu Feb 23 17:10:29 2017
@@ -103,6 +103,27 @@ define(["dojo/dom",
         }
     });
 
+    var typeSelector = registry.byId("formAddQueue.overflowPolicy");
+    typeSelector.on("change", function (value)
+    {
+        query(".overflowPolicySpecificDiv")
+            .forEach(function (node, index, arr)
+            {
+                if (node.id === "formAddQueueOverflowPolicy:" + value)
+                {
+                    node.style.display = "block";
+                    if (addQueue.management)
+                    {
+                        util.applyMetadataToWidgets(node, "Queue", value, addQueue.management.metadata);
+                    }
+                }
+                else
+                {
+                    node.style.display = "none";
+                }
+            });
+    });
+
     theForm.on("submit", function (e)
     {
 

Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/showQueue.html
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/showQueue.html?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/showQueue.html (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/showQueue.html Thu Feb 23 17:10:29 2017
@@ -48,6 +48,11 @@
                 <div class="formLabel-labelCell">Persist Messages:</div>
                 <div class="messageDurability formValue-valueCell"></div>
             </div>
+            <div class="clear">
+                <div class="formLabel-labelCell">Overflow policy:</div>
+                <div class="overflowPolicy formValue-valueCell"></div>
+                <div class="overflowPolicyQualifier formValue-valueCell"></div>
+            </div>
         </div>
         <div class="alignRight">
             <div class="clear">
@@ -78,6 +83,15 @@
                 </div>
             </div>
             <div class="clear">
+                <div class="formLabel-labelCell">Size including header:</div>
+                <div class="formValue-valueCell">
+                    <span class="queueDepthMessagesIncludingHeader"></span>
+                    <span> msgs</span>
+                    <span class="queueDepthBytesIncludingHeader">(</span>
+                    <span class="queueDepthBytesUnitsIncludingHeader">)</span>
+                </div>
+            </div>
+            <div class="clear">
                 <div class="formLabel-labelCell">Pre-fetched:</div>
                 <div class="formValue-valueCell">
                     <span class="unacknowledgedMessages"></span>

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java Thu Feb 23 17:10:29 2017
@@ -38,6 +38,8 @@ import com.fasterxml.jackson.databind.Ob
 
 import org.apache.qpid.server.common.AMQPFilterTypes;
 import org.apache.qpid.server.exchange.ExchangeDefaults;
+import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.model.OverflowPolicy;
 import org.apache.qpid.server.protocol.v0_8.AMQShortString;
 import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
 import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
@@ -70,7 +72,6 @@ import org.apache.qpid.server.session.AM
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.model.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
 import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost;
@@ -106,6 +107,7 @@ public class VirtualHostMessageStoreTest
     private String durablePriorityQueueName = "MST-PriorityQueue-Durable";
     private String durableLastValueQueueName = "MST-LastValueQueue-Durable";
     private String durableQueueName = "MST-Queue-Durable";
+    private String durableQueueRingOverflowPolicy = "MST-Queue-Ring-OverflowPolicy";
     private String priorityQueueName = "MST-PriorityQueue";
     private String queueName = "MST-Queue";
 
@@ -235,7 +237,7 @@ public class VirtualHostMessageStoreTest
         validateMessageOnTopics(2, true);
 
         assertEquals("Not all queues correctly registered",
-                10, _virtualHost.getChildren(Queue.class).size());
+                11, _virtualHost.getChildren(Queue.class).size());
     }
 
     public void testMessagePersistence() throws Exception
@@ -319,7 +321,7 @@ public class VirtualHostMessageStoreTest
     public void testDurableQueueRemoval() throws Exception
     {
         //Register Durable Queue
-        createQueue(durableQueueName, false, true, false, false);
+        createQueue(durableQueueName, false, true, false, false, false);
 
         assertEquals("Incorrect number of queues registered before recovery",
                 1,  _virtualHost.getChildren(Queue.class).size());
@@ -438,7 +440,7 @@ public class VirtualHostMessageStoreTest
         //create durable queue and exchange, bind them
         Exchange<?>
                 exch = createExchange(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, directExchangeName, true);
-        createQueue(durableQueueName, false, true, false, false);
+        createQueue(durableQueueName, false, true, false, false, false);
         bindQueueToExchange(exch, directRouting, _virtualHost.getChildByName(Queue.class, durableQueueName), false);
 
         assertEquals("Incorrect number of bindings registered before recovery",
@@ -546,15 +548,16 @@ public class VirtualHostMessageStoreTest
 
     private void validateDurableQueueProperties()
     {
-        validateQueueProperties(_virtualHost.getChildByName(Queue.class, durablePriorityQueueName), true, true, false, false);
-        validateQueueProperties(_virtualHost.getChildByName(Queue.class, durablePriorityTopicQueueName), true, true, false, false);
-        validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableQueueName), false, true, false, false);
-        validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableTopicQueueName), false, true, false, false);
-        validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableExclusiveQueueName), false, true, true, false);
-        validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableLastValueQueueName), false, true, true, true);
+        validateQueueProperties(_virtualHost.getChildByName(Queue.class, durablePriorityQueueName), true, true, false, false, false);
+        validateQueueProperties(_virtualHost.getChildByName(Queue.class, durablePriorityTopicQueueName), true, true, false, false, false);
+        validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableQueueName), false, true, false, false, false);
+        validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableTopicQueueName), false, true, false, false, false);
+        validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableExclusiveQueueName), false, true, true, false, false);
+        validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableLastValueQueueName), false, true, true, true, false);
+        validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableQueueRingOverflowPolicy), false, true, false, false, true);
     }
 
-    private void validateQueueProperties(Queue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
+    private void validateQueueProperties(Queue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue, boolean ringPolicy)
     {
         if(usePriority || lastValueQueue)
         {
@@ -580,6 +583,7 @@ public class VirtualHostMessageStoreTest
         assertEquals("Queue owner is not as expected for queue " + queue.getName(), exclusive ? queueOwner : null, queue.getOwner());
         assertEquals("Queue durability is not as expected for queue " + queue.getName(), durable, queue.isDurable());
         assertEquals("Queue exclusivity is not as expected for queue " + queue.getName(), exclusive, queue.getExclusive() != ExclusivityPolicy.NONE);
+        assertEquals("Queue overflow policy is not as expected for queue " + queue.getName(), ringPolicy, queue.getOverflowPolicy() == OverflowPolicy.RING);
     }
 
     /**
@@ -624,40 +628,43 @@ public class VirtualHostMessageStoreTest
     private void createAllQueues() throws Exception
     {
         //Register Durable Priority Queue
-        createQueue(durablePriorityQueueName, true, true, false, false);
+        createQueue(durablePriorityQueueName, true, true, false, false, false);
 
         //Register Durable Simple Queue
-        createQueue(durableQueueName, false, true, false, false);
+        createQueue(durableQueueName, false, true, false, false, false);
 
         //Register Durable Exclusive Simple Queue
-        createQueue(durableExclusiveQueueName, false, true, true, false);
+        createQueue(durableExclusiveQueueName, false, true, true, false, false);
 
         //Register Durable LastValue Queue
-        createQueue(durableLastValueQueueName, false, true, true, true);
+        createQueue(durableLastValueQueueName, false, true, true, true, false);
+
+        //Register Durable Queue with Ring Overflow Policy
+        createQueue(durableQueueRingOverflowPolicy, false, true, false, false, true);
 
         //Register NON-Durable Priority Queue
-        createQueue(priorityQueueName, true, false, false, false);
+        createQueue(priorityQueueName, true, false, false, false, false);
 
         //Register NON-Durable Simple Queue
-        createQueue(queueName, false, false, false, false);
+        createQueue(queueName, false, false, false, false, false);
     }
 
     private void createAllTopicQueues() throws Exception
     {
         //Register Durable Priority Queue
-        createQueue(durablePriorityTopicQueueName, true, true, false, false);
+        createQueue(durablePriorityTopicQueueName, true, true, false, false, false);
 
         //Register Durable Simple Queue
-        createQueue(durableTopicQueueName, false, true, false, false);
+        createQueue(durableTopicQueueName, false, true, false, false, false);
 
         //Register NON-Durable Priority Queue
-        createQueue(priorityTopicQueueName, true, false, false, false);
+        createQueue(priorityTopicQueueName, true, false, false, false, false);
 
         //Register NON-Durable Simple Queue
-        createQueue(topicQueueName, false, false, false, false);
+        createQueue(topicQueueName, false, false, false, false, false);
     }
 
-    private void createQueue(String queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
+    private void createQueue(String queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue, boolean ringPolicy)
             throws Exception
     {
 
@@ -703,7 +710,7 @@ public class VirtualHostMessageStoreTest
                                          });
 
 
-        validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue);
+        validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue, ringPolicy);
     }
 
     private Map<String, Exchange<?>> createExchanges() throws Exception

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java Thu Feb 23 17:10:29 2017
@@ -88,7 +88,7 @@ public class QueuePolicyTest extends Qpi
     }
     
     /**
-     * Test Goal : To create a ring queue programitcally using the address string and observe
+     * Test Goal : To create a ring queue programmatically using the address string and observe
      *             that it works as expected.
      */
     public void testRingPolicy() throws Exception

Modified: qpid/java/trunk/test-profiles/Java010Excludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java010Excludes?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java010Excludes (original)
+++ qpid/java/trunk/test-profiles/Java010Excludes Thu Feb 23 17:10:29 2017
@@ -40,10 +40,7 @@ org.apache.qpid.test.unit.close.JavaServ
 //QPID-1864: rollback with subscriptions does not work in 0-10 yet
 org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage
 
-//QPID-3422: test fails because ring queue is not implemented on Broker for Java
-org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testBrowseMode
-
-// QPID-3133: On 0-10, the exception listener is currently not invoked when reconnection fails to occurs. 
+// QPID-3133: On 0-10, the exception listener is currently not invoked when reconnection fails to occurs.
 org.apache.qpid.server.failover.FailoverMethodTest#*
 
 // QPID-3392: the Broker for Java does not yet implement exchange creation arguments

Modified: qpid/java/trunk/test-profiles/JavaExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/JavaExcludes?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/JavaExcludes (original)
+++ qpid/java/trunk/test-profiles/JavaExcludes Thu Feb 23 17:10:29 2017
@@ -20,8 +20,7 @@
 // QPID-1823: this takes ages to run
 org.apache.qpid.client.SessionCreateTest#*
 
-//QPID-2845: The queue policy types used by the C++ broker are not currently supported by the Apache Qpid Broker for Java
-org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy
+//QPID-2845: The queue reject policy type used by the C++ broker is not currently supported by the Apache Qpid Broker for Java
 org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy
 
 // Test runs for 2 minutes testing that subtraction works

Modified: qpid/java/trunk/test-profiles/python_tests/Java010PythonExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/python_tests/Java010PythonExcludes?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/python_tests/Java010PythonExcludes (original)
+++ qpid/java/trunk/test-profiles/python_tests/Java010PythonExcludes Thu Feb 23 17:10:29 2017
@@ -49,12 +49,9 @@ qpid_tests.broker_0_10.extensions.Extens
 #The broker does not support the timed-autodelete extension
 qpid_tests.broker_0_10.extensions.ExtensionTests.test_timed_autodelete
 
-#The broker does not support ring queues, fairshare, or the priority alias
-qpid_tests.broker_0_10.priority.PriorityTests.test_ring_queue*
+#The broker does not support fairshare, or the priority alias
 qpid_tests.broker_0_10.priority.PriorityTests.test_fairshare*
 qpid_tests.broker_0_10.priority.PriorityTests.test_prioritised_delivery_with_alias
-#QPID-6299 broker does not support ring queue on lvq
-qpid_tests.broker_0_10.lvq.LVQTests.test_ring_lvq2
 
 #QPID-6572 These tests pass a empty binding key argument, which won't match with the Broker for Java's stricter impl.
 qpid_tests.broker_0_10.exchange.AutodeleteTests.testAutodeleteFanout



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org