You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/02/23 17:10:29 UTC
svn commit: r1784177 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-core/src/test/java/org/apache/q...
Author: orudyy
Date: Thu Feb 23 17:10:29 2017
New Revision: 1784177
URL: http://svn.apache.org/viewvc?rev=1784177&view=rev
Log:
QPID-7618: [Java Broker] Add ring policy support
Work done by Tomas Vavricka <va...@gmail.com>
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java
qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/addQueue.html
qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/showQueue.html
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java
qpid/java/trunk/test-profiles/Java010Excludes
qpid/java/trunk/test-profiles/JavaExcludes
qpid/java/trunk/test-profiles/python_tests/Java010PythonExcludes
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java Thu Feb 23 17:10:29 2017
@@ -62,6 +62,12 @@ public abstract class AbstractServerMess
return _handle.getContentSize();
}
+ @Override
+ public long getSizeIncludingHeader()
+ {
+ return _handle.getMetaData().getContentSize() + _handle.getMetaData().getStorableSize();
+ }
+
public StoredMessage<T> getStoredMessage()
{
return _handle;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java Thu Feb 23 17:10:29 2017
@@ -42,6 +42,14 @@ public class RoutingResult<M extends Ser
private final Set<BaseQueue> _queues = new HashSet<>();
+ private int _errorCodeAmqp_0_10;
+
+ private String _errorCodeAmqp_1_0;
+
+ private String _errorMessage;
+
+ private boolean _routingFailure = false;
+
public RoutingResult(final M message)
{
_message = message;
@@ -82,7 +90,17 @@ public class RoutingResult<M extends Ser
public void add(RoutingResult<M> result)
{
- addQueues(result._queues);
+ if (result.isRoutingFailure())
+ {
+ _routingFailure = result._routingFailure;
+ _errorCodeAmqp_0_10 = result._errorCodeAmqp_0_10;
+ _errorCodeAmqp_1_0 = result._errorCodeAmqp_1_0;
+ _errorMessage = result._errorMessage;
+ }
+ else
+ {
+ addQueues(result._queues);
+ }
}
public int send(ServerTransaction txn,
@@ -144,4 +162,32 @@ public class RoutingResult<M extends Ser
{
return !_queues.isEmpty();
}
+
+ public void addRoutingFailure(int errorCodeAmqp_0_10, String errorCodeAmqp_1_0, String reason)
+ {
+ _routingFailure = true;
+ this._errorCodeAmqp_0_10 = errorCodeAmqp_0_10;
+ this._errorCodeAmqp_1_0 = errorCodeAmqp_1_0;
+ _errorMessage = reason;
+ }
+
+ public String getErrorMessage()
+ {
+ return _errorMessage;
+ }
+
+ public int getErrorCodeAmqp_0_10()
+ {
+ return _errorCodeAmqp_0_10;
+ }
+
+ public String getErrorCodeAmqp_1_0()
+ {
+ return _errorCodeAmqp_1_0;
+ }
+
+ public boolean isRoutingFailure()
+ {
+ return _routingFailure;
+ }
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java Thu Feb 23 17:10:29 2017
@@ -38,6 +38,8 @@ public interface ServerMessage<T extends
long getSize();
+ long getSizeIncludingHeader();
+
long getExpiration();
MessageReference newReference();
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java?rev=1784177&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java Thu Feb 23 17:10:29 2017
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in complianceS
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+public enum OverflowPolicy
+{
+ NONE,
+ RING
+}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Thu Feb 23 17:10:29 2017
@@ -82,7 +82,9 @@ public interface Queue<X extends Queue<X
String DEFAULT_FILTERS = "defaultFilters";
String ENSURE_NONDESTRUCTIVE_CONSUMERS = "ensureNondestructiveConsumers";
String HOLD_ON_PUBLISH_ENABLED = "holdOnPublishEnabled";
-
+ String OVERFLOW_POLICY = "overflowPolicy";
+ String MAX_COUNT = "maxCount";
+ String MAX_SIZE = "maxSize";
String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint";
@SuppressWarnings("unused")
@@ -259,6 +261,27 @@ public interface Queue<X extends Queue<X
+ "visible may depend on how frequently the virtual host housekeeping thread runs.")
boolean isHoldOnPublishEnabled();
+ @ManagedContextDefault( name = "queue.defaultMaxCount", description = "Maximum count of messages in queue when policy_type set to Ring")
+ long DEFAULT_MAX_COUNT = 10;
+
+ @ManagedAttribute( defaultValue = "${queue.defaultMaxCount}", description = "Maximum count of messages in queue, when policy_type set to Ring")
+ long getMaxCount();
+
+ @ManagedContextDefault( name = "queue.defaultMaxSize", description = "Maximum size of messages in queue in bytes, when policy_type set to Ring")
+ long DEFAULT_MAX_SIZE = 1024;
+
+ @ManagedAttribute( defaultValue = "${queue.defaultMaxSize}", description = "Maximum size of messages in queue in bytes, when policy_type set to Ring")
+ long getMaxSize();
+
+ @SuppressWarnings("unused")
+ @ManagedContextDefault( name = "queue.defaultOverflowPolicy")
+ OverflowPolicy DEFAULT_POLICY_TYPE = OverflowPolicy.NONE;
+
+ @ManagedAttribute( defaultValue = "${queue.defaultOverflowPolicy}", description = "Queue overflow policy. Current options are Ring and None." +
+ " Ring overflow policy - when queue message count or size of messages in queue exceeds maximum, oldest message(s) is discarded." +
+ " None overflow policy - maximum size and maximum count properties are not applied.")
+ OverflowPolicy getOverflowPolicy();
+
@ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
Collection<PublishingLink> getPublishingLinks();
@@ -274,6 +297,7 @@ public interface Queue<X extends Queue<X
int deleteAndReturnCount();
+ void deleteEntry(QueueEntry node);
void setNotificationListener(QueueNotificationListener listener);
@@ -309,6 +333,9 @@ public interface Queue<X extends Queue<X
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Queue Depth")
long getQueueDepthBytes();
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, label = "Queue Depth Including Header")
+ long getQueueDepthBytesIncludingHeader();
+
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Queue Depth")
int getQueueDepthMessages();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Feb 23 17:10:29 2017
@@ -191,6 +191,9 @@ public abstract class AbstractQueue<X ex
private ExclusivityPolicy _exclusive;
@ManagedAttributeField
+ private OverflowPolicy _overflowPolicy;
+
+ @ManagedAttributeField
private MessageDurability _messageDurability;
@ManagedAttributeField
@@ -256,7 +259,10 @@ public abstract class AbstractQueue<X ex
private boolean _ensureNondestructiveConsumers;
@ManagedAttributeField
private volatile boolean _holdOnPublishEnabled;
-
+ @ManagedAttributeField
+ private long _maxCount;
+ @ManagedAttributeField
+ private long _maxSize;
private static final int RECOVERING = 1;
private static final int COMPLETING_RECOVERY = 2;
@@ -674,6 +680,18 @@ public abstract class AbstractQueue<X ex
}
@Override
+ public long getMaxCount()
+ {
+ return _maxCount;
+ }
+
+ @Override
+ public long getMaxSize()
+ {
+ return _maxSize;
+ }
+
+ @Override
public Collection<String> getAvailableAttributes()
{
return new ArrayList<String>(_arguments.keySet());
@@ -1140,6 +1158,7 @@ public abstract class AbstractQueue<X ex
protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
{
+ applyOverflowPolicy(message);
final QueueEntry entry = getEntries().add(message, enqueueRecord);
updateExpiration(entry);
@@ -1163,6 +1182,31 @@ public abstract class AbstractQueue<X ex
}
+ private void applyOverflowPolicy(final ServerMessage message)
+ {
+ if (getOverflowPolicy() != null)
+ {
+ switch (getOverflowPolicy())
+ {
+ case RING:
+ while (getQueueDepthMessages() == getMaxCount())
+ {
+ QueueEntry entry = getEntries().getOldestEntry();
+ deleteEntry(entry);
+ }
+ while (getQueueDepthBytesIncludingHeader() + message.getSizeIncludingHeader() > getMaxSize())
+ {
+ QueueEntry entry = getEntries().getOldestEntry();
+ deleteEntry(entry);
+ }
+ break;
+ case NONE:
+ default:
+ break;
+ }
+ }
+ }
+
private void updateExpiration(final QueueEntry entry)
{
long expiration = entry.getMessage().getExpiration();
@@ -1336,6 +1380,12 @@ public abstract class AbstractQueue<X ex
}
@Override
+ public long getQueueDepthBytesIncludingHeader()
+ {
+ return _queueStatistics.getQueueSizeIncludingHeader();
+ }
+
+ @Override
public long getAvailableBytes()
{
return _queueStatistics.getAvailableSize();
@@ -1621,6 +1671,25 @@ public abstract class AbstractQueue<X ex
});
}
+ public void deleteEntry(final QueueEntry entry)
+ {
+ boolean acquiredForDequeueing = entry.acquireOrSteal(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ dequeueEntry(entry);
+ }
+ });
+
+ if(acquiredForDequeueing)
+ {
+ _logger.debug("Dequeuing expired node {}", entry);
+ // Then dequeue it.
+ dequeueEntry(entry);
+ }
+ }
+
@Override
public void addDeleteTask(final Action<? super X> task)
{
@@ -2086,21 +2155,7 @@ public abstract class AbstractQueue<X ex
// If the node has expired then acquire it
if (node.expired())
{
- boolean acquiredForDequeueing = node.acquireOrSteal(new Runnable()
- {
- @Override
- public void run()
- {
- dequeueEntry(node);
- }
- });
-
- if(acquiredForDequeueing)
- {
- _logger.debug("Dequeuing expired node {}", node);
- // Then dequeue it.
- dequeueEntry(node);
- }
+ deleteEntry(node);
}
else
{
@@ -2581,7 +2636,32 @@ public abstract class AbstractQueue<X ex
throw new VirtualHostUnavailableException(this._virtualHost);
}
RoutingResult<M> result = new RoutingResult<>(message);
- if(message.isResourceAcceptable(this) && !message.isReferenced(this))
+
+ if(_overflowPolicy == OverflowPolicy.RING && message.getSizeIncludingHeader() > getMaxSize())
+ {
+ result.addRoutingFailure(506, "amqp:resource-limit-exceeded", "Message larger than configured maximum depth on " + this.getName()
+ + ": size=" + message.getSizeIncludingHeader() + ", max-size=" + getMaxSize());
+ }
+ else if(_overflowPolicy == OverflowPolicy.RING && _maxCount == 0)
+ {
+ result.addRoutingFailure(506, "amqp:resource-limit-exceeded", "Queue '" + getName() + "' maximum count (0) prevents this message from being accepted");
+ }
+ else if (this instanceof PriorityQueue && message.isResourceAcceptable(this) && !message.isReferenced(this))
+ {
+ if (getMaxCount() == getQueueDepthMessages())
+ {
+ QueueEntry entry = getEntries().getOldestEntry();
+ if(entry.getMessage().getMessageHeader().getPriority() <= message.getMessageHeader().getPriority())
+ {
+ result.addQueue(this);
+ }
+ }
+ else
+ {
+ result.addQueue(this);
+ }
+ }
+ else if(message.isResourceAcceptable(this) && !message.isReferenced(this))
{
result.addQueue(this);
}
@@ -2874,6 +2954,12 @@ public abstract class AbstractQueue<X ex
}
@Override
+ public OverflowPolicy getOverflowPolicy()
+ {
+ return _overflowPolicy;
+ }
+
+ @Override
public boolean isNoLocal()
{
return _noLocal;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java Thu Feb 23 17:10:29 2017
@@ -48,6 +48,7 @@ abstract class AbstractQueueEntryList im
final long size = entry.getSize();
final QueueStatistics queueStatistics = _queueStatistics;
queueStatistics.addToAvailable(size);
+ queueStatistics.addToDepthIncludingHeader(entry.getSizeWithHeader());
queueStatistics.addToQueue(size);
queueStatistics.addToEnqueued(size);
if(_forcePersistent || (_respectPersistent && entry.getMessage().isPersistent()))
@@ -89,6 +90,7 @@ abstract class AbstractQueueEntryList im
break;
case DELETED:
queueStatistics.removeFromQueue(size);
+ queueStatistics.removeFromDepthIncludingHeader(entry.getSizeWithHeader());
queueStatistics.addToDequeued(size);
if(_forcePersistent || (_respectPersistent && entry.isPersistent()))
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Thu Feb 23 17:10:29 2017
@@ -207,9 +207,9 @@ abstract public class PriorityQueueList
for(PriorityQueueEntrySubList subList : _priorityLists)
{
QueueEntry subListOldest = subList.getOldestEntry();
- if(oldest == null || (subListOldest != null && subListOldest.getMessage().getMessageNumber() < oldest.getMessage().getMessageNumber()))
+ if (subListOldest != null)
{
- oldest = subListOldest;
+ return subListOldest;
}
}
return oldest;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java Thu Feb 23 17:10:29 2017
@@ -25,6 +25,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.OverflowPolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
@@ -65,6 +66,11 @@ public class QueueArgumentsConverter
public static final String QPID_EXCLUSIVITY_POLICY = "qpid.exclusivity_policy";
public static final String QPID_LIFETIME_POLICY = "qpid.lifetime_policy";
+
+ public static final String QPID_POLICY_TYPE = "qpid.policy_type";
+ public static final String QPID_MAX_COUNT = "qpid.max_count";
+ public static final String QPID_MAX_SIZE = "qpid.max_size";
+
/**
* No-local queue argument is used to support the no-local feature of Durable Subscribers.
*/
@@ -109,6 +115,9 @@ public class QueueArgumentsConverter
ATTRIBUTE_MAPPINGS.put(QPID_EXCLUSIVITY_POLICY, Queue.EXCLUSIVE);
ATTRIBUTE_MAPPINGS.put(QPID_LIFETIME_POLICY, Queue.LIFETIME_POLICY);
+ ATTRIBUTE_MAPPINGS.put(QPID_POLICY_TYPE, Queue.OVERFLOW_POLICY);
+ ATTRIBUTE_MAPPINGS.put(QPID_MAX_COUNT, Queue.MAX_COUNT);
+ ATTRIBUTE_MAPPINGS.put(QPID_MAX_SIZE, Queue.MAX_SIZE);
}
@@ -128,6 +137,10 @@ public class QueueArgumentsConverter
{
modelArguments.put(LastValueQueue.LVQ_KEY, LastValueQueue.DEFAULT_LVQ_KEY);
}
+ if(wireArguments.containsKey(QPID_POLICY_TYPE))
+ {
+ modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.valueOf(String.valueOf(wireArguments.get(QPID_POLICY_TYPE)).toUpperCase()));
+ }
if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP))
{
modelArguments.put(Queue.MESSAGE_GROUP_SHARED_GROUPS,
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Thu Feb 23 17:10:29 2017
@@ -31,6 +31,8 @@ public interface QueueEntry extends Mess
long getSize();
+ long getSizeWithHeader();
+
boolean acquireOrSteal(final Runnable delayedAcquisitionTask);
boolean isQueueDeleted();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Thu Feb 23 17:10:29 2017
@@ -195,6 +195,11 @@ public abstract class QueueEntryImpl imp
return getMessage() == null ? 0 : getMessage().getSize();
}
+ public long getSizeWithHeader()
+ {
+ return getMessage() == null ? 0 : getMessage().getSizeIncludingHeader();
+ }
+
public boolean getDeliveredToConsumer()
{
return _deliveryCountUpdater.get(this) != -1;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java Thu Feb 23 17:10:29 2017
@@ -27,6 +27,7 @@ final class QueueStatistics
{
private final AtomicInteger _queueCount = new AtomicInteger();
private final AtomicLong _queueSize = new AtomicLong();
+ private final AtomicLong _queueSizeIncludingHeader = new AtomicLong();
private final AtomicInteger _unackedCount = new AtomicInteger();
private final AtomicLong _unackedSize = new AtomicLong();
@@ -62,6 +63,11 @@ final class QueueStatistics
return _queueSize.get();
}
+ public long getQueueSizeIncludingHeader()
+ {
+ return _queueSizeIncludingHeader.get();
+ }
+
public final int getUnackedCount()
{
return _unackedCount.get();
@@ -164,6 +170,16 @@ final class QueueStatistics
_queueSize.addAndGet(-size);
}
+ public void addToDepthIncludingHeader(long size)
+ {
+ _queueSizeIncludingHeader.addAndGet(size);
+ }
+
+ public void removeFromDepthIncludingHeader(long size)
+ {
+ _queueSizeIncludingHeader.addAndGet(-size);
+ }
+
void addToAvailable(long size)
{
int count = _availableCount.incrementAndGet();
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Thu Feb 23 17:10:29 2017
@@ -57,12 +57,14 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.BrokerTestHelper;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
+import org.apache.qpid.server.model.OverflowPolicy;
import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
@@ -840,6 +842,153 @@ abstract class AbstractQueueTestBase ext
assertEquals(10l,queue.getOldestMessageArrivalTime());
}
+ public void testNoneOverflowPolicy()
+ {
+ Map<String,Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.MAX_COUNT, 2);
+ attributes.put(Queue.MAX_SIZE, 100);
+
+ Queue<?> queue = getQueue();
+ queue.setAttributes(attributes);
+
+ ServerMessage message = createMessage(new Long(24), 50, 50);
+ when(message.getArrivalTime()).thenReturn(10l);
+ queue.enqueue(message, null, null);
+ message = createMessage(new Long(25), 50, 50);
+ when(message.getArrivalTime()).thenReturn(50l);
+ queue.enqueue(message, null, null);
+ message = createMessage(new Long(26), 50, 50);
+ when(message.getArrivalTime()).thenReturn(200l);
+ queue.enqueue(message, null, null);
+
+ assertEquals("Wrong number of messages in queue",3, queue.getQueueDepthMessages());
+ assertEquals("Wrong size of messages in queue",300, queue.getQueueDepthBytesIncludingHeader());
+ assertEquals("Wrong oldest message", 10l,
+ ((AbstractQueue) queue).getEntries().getOldestEntry().getMessage().getArrivalTime());
+ queue.clearQueue();
+
+ attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.MAX_COUNT, Queue.DEFAULT_MAX_COUNT);
+ attributes.put(Queue.MAX_SIZE, Queue.DEFAULT_MAX_SIZE);
+ queue.setAttributes(attributes);
+ }
+
+ public void testRingOverflowPolicyMaxCount()
+ {
+ Map<String,Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.RING);
+ attributes.put(Queue.MAX_COUNT, 4);
+
+ Queue<?> queue = getQueue();
+ queue.setAttributes(attributes);
+
+ ServerMessage message = createMessage(new Long(24), 10, 10);
+ when(message.getArrivalTime()).thenReturn(10l);
+ queue.enqueue(message, null, null);
+ message = createMessage(new Long(25), 10, 10);
+ when(message.getArrivalTime()).thenReturn(50l);
+ queue.enqueue(message, null, null);
+ message = createMessage(new Long(26), 10, 10);
+ when(message.getArrivalTime()).thenReturn(200l);
+ queue.enqueue(message, null, null);
+ message = createMessage(new Long(27), 10, 10);
+ when(message.getArrivalTime()).thenReturn(500l);
+ queue.enqueue(message, null, null);
+ message = createMessage(new Long(28), 10, 10);
+ when(message.getArrivalTime()).thenReturn(1000l);
+ queue.enqueue(message, null, null);
+
+ assertEquals("Wrong number of messages in queue",4, queue.getQueueDepthMessages());
+ assertEquals("Wrong size of messages in queue",80, queue.getQueueDepthBytesIncludingHeader());
+ assertEquals("Wrong oldest message", 50l,
+ ((AbstractQueue) queue).getEntries().getOldestEntry().getMessage().getArrivalTime());
+ queue.clearQueue();
+
+ attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.OVERFLOW_POLICY, Queue.DEFAULT_POLICY_TYPE);
+ attributes.put(Queue.MAX_COUNT, Queue.DEFAULT_MAX_COUNT);
+ queue.setAttributes(attributes);
+ }
+
+ public void testRingOverflowPolicyMaxSize()
+ {
+ Map<String,Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.RING);
+ attributes.put(Queue.MAX_COUNT, 4);
+ attributes.put(Queue.MAX_SIZE, 100);
+
+ Queue<?> queue = getQueue();
+ queue.setAttributes(attributes);
+
+ ServerMessage message = createMessage(new Long(24), 10, 10);
+ when(message.getArrivalTime()).thenReturn(10l);
+ queue.enqueue(message, null, null);
+ message = createMessage(new Long(25), 10, 10);
+ when(message.getArrivalTime()).thenReturn(50l);
+ queue.enqueue(message, null, null);
+ message = createMessage(new Long(26), 20, 10);
+ when(message.getArrivalTime()).thenReturn(200l);
+ queue.enqueue(message, null, null);
+ message = createMessage(new Long(27), 20, 10);
+ when(message.getArrivalTime()).thenReturn(200l);
+ queue.enqueue(message, null, null);
+
+ assertEquals("Wrong number of messages in queue",4, queue.getQueueDepthMessages());
+ assertEquals("Wrong size of messages in queue",100, queue.getQueueDepthBytesIncludingHeader());
+
+ message = createMessage(new Long(27), 20, 10);
+ when(message.getArrivalTime()).thenReturn(500l);
+ queue.enqueue(message, null, null);
+
+ assertEquals("Wrong number of messages in queue",3, queue.getQueueDepthMessages());
+ assertEquals("Wrong size of messages in queue",90, queue.getQueueDepthBytesIncludingHeader());
+ assertEquals("Wrong oldest message", 200l,
+ ((AbstractQueue) queue).getEntries().getOldestEntry().getMessage().getArrivalTime());
+ queue.clearQueue();
+
+ attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.OVERFLOW_POLICY, Queue.DEFAULT_POLICY_TYPE);
+ attributes.put(Queue.MAX_COUNT, Queue.DEFAULT_MAX_COUNT);
+ attributes.put(Queue.MAX_SIZE, Queue.DEFAULT_MAX_SIZE);
+ queue.setAttributes(attributes);
+ }
+
+ public void testRingOverflowPolicyMessagesRejected()
+ {
+ Map<String,Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.RING);
+ attributes.put(Queue.MAX_COUNT, 0);
+
+ Queue<?> queue = getQueue();
+ queue.setAttributes(attributes);
+
+ ServerMessage message;
+ RoutingResult result;
+
+ message = createMessage(new Long(27), 20, 10);
+ result = queue.route(message, message.getInitialRoutingAddress(), null);
+ assertTrue("Result should include routing failure", result.isRoutingFailure());
+
+ int headerSize = 20;
+ int payloadSize = 10;
+ int id = 28;
+
+ attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.MAX_COUNT, 10);
+ attributes.put(Queue.MAX_SIZE, 10);
+ queue.setAttributes(attributes);
+
+ message = createMessage(new Long(id), headerSize, payloadSize);
+ result = queue.route(message, message.getInitialRoutingAddress(), null);
+ assertTrue("Result should include routing failure", result.isRoutingFailure());
+
+ attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.OVERFLOW_POLICY, Queue.DEFAULT_POLICY_TYPE);
+ attributes.put(Queue.MAX_COUNT, Queue.DEFAULT_MAX_COUNT);
+ attributes.put(Queue.MAX_SIZE, Queue.DEFAULT_MAX_SIZE);
+ queue.setAttributes(attributes);
+ }
+
private long getExpirationOnQueue(final Queue<?> queue, long arrivalTime, long expiration)
{
final List<QueueEntry> entries = new ArrayList<>();
@@ -1017,6 +1166,13 @@ abstract class AbstractQueueTestBase ext
}
+ protected ServerMessage createMessage(Long id, final int headerSize, final int payloadSize)
+ {
+ ServerMessage message = createMessage(id);
+ when(message.getSizeIncludingHeader()).thenReturn(new Long(headerSize + payloadSize));
+ return message;
+ }
+
protected ServerMessage createMessage(Long id)
{
AMQMessageHeader header = mock(AMQMessageHeader.class);
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java Thu Feb 23 17:10:29 2017
@@ -154,6 +154,12 @@ public class TestMessageMetaDataType imp
}
@Override
+ public long getSizeIncludingHeader()
+ {
+ return 0;
+ }
+
+ @Override
public StoredMessage<TestMessageMetaData> getStoredMessage()
{
return _storedMsg;
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Thu Feb 23 17:10:29 2017
@@ -85,6 +85,12 @@ class MockServerMessage implements Serve
}
@Override
+ public long getSizeIncludingHeader()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public String getInitialRoutingAddress()
{
throw new UnsupportedOperationException();
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Thu Feb 23 17:10:29 2017
@@ -1007,11 +1007,24 @@ public class ServerSession extends Sessi
}
final RoutingResult<MessageTransferMessage> result =
exchange.route(message, message.getInitialRoutingAddress(), instanceProperties);
- int enqueues = result.send(_transaction, _checkCapacityAction);
- getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime());
- incrementOutstandingTxnsIfNecessary();
- incrementUncommittedMessageSize(message.getStoredMessage());
- return enqueues;
+ if (result.isRoutingFailure())
+ {
+ org.apache.qpid.server.transport.ExecutionException ex = new org.apache.qpid.server.transport.ExecutionException();
+ ex.setErrorCode(ExecutionErrorCode.get(result.getErrorCodeAmqp_0_10()));
+ ex.setCommandId((int) message.getMessageNumber());
+ ex.setDescription(result.getErrorMessage());
+ invoke(ex);
+ close(ExecutionErrorCode.get(result.getErrorCodeAmqp_0_10()).getValue(), result.getErrorMessage());
+ return 0;
+ }
+ else
+ {
+ int enqueues = result.send(_transaction, _checkCapacityAction);
+ getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime());
+ incrementOutstandingTxnsIfNecessary();
+ incrementUncommittedMessageSize(message.getStoredMessage());
+ return enqueues;
+ }
}
private void resetUncommittedMessages()
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java Thu Feb 23 17:10:29 2017
@@ -106,20 +106,30 @@ public class NodeReceivingDestination im
}};
RoutingResult result = _destination.route(message, routingAddress, instanceProperties);
- int enqueues = result.send(txn, action);
-
- if(enqueues == 0)
+ if (result.isRoutingFailure())
{
- _eventLogger.message(ExchangeMessages.DISCARDMSG(_destination.getName(), routingAddress));
+ return createdRejectedOutcome(AmqpError.valueOf(result.getErrorCodeAmqp_1_0()),
+ result.getErrorMessage());
}
+ else
+ {
+ int enqueues = result.send(txn, action);
- return enqueues == 0 && !_discardUnroutable ? createdRejectedOutcome(routingAddress) : ACCEPTED;
+ if(enqueues == 0)
+ {
+ _eventLogger.message(ExchangeMessages.DISCARDMSG(_destination.getName(), routingAddress));
+ }
+
+ return enqueues == 0 && !_discardUnroutable ?
+ createdRejectedOutcome(AmqpError.NOT_FOUND,
+ "Unknown destination '" + routingAddress + '"') : ACCEPTED;
+ }
}
- private Outcome createdRejectedOutcome(String routingAddress)
+ private Outcome createdRejectedOutcome(AmqpError errorCode, String errorMessage)
{
Rejected rejected = new Rejected();
- final Error notFoundError = new Error(AmqpError.NOT_FOUND, "Unknown destination '" + routingAddress + '"');
+ final Error notFoundError = new Error(errorCode, errorMessage);
rejected.setError(notFoundError);
return rejected;
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java Thu Feb 23 17:10:29 2017
@@ -149,6 +149,11 @@ public class AmqpError
}
}
+ public static AmqpError valueOf(String errorCode)
+ {
+ return valueOf(Symbol.valueOf(errorCode));
+ }
+
public static AmqpError valueOf(Object obj)
{
Symbol val = (Symbol) obj;
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/addQueue.html
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/addQueue.html?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/addQueue.html (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/addQueue.html Thu Feb 23 17:10:29 2017
@@ -138,6 +138,52 @@
</div>
</div>
<div class="clear">
+ <div class="formLabel-labelCell">Overflow policy:</div>
+ <div class="formLabel-controlCell">
+ <select id="formAddQueue.overflowPolicy"
+ dojoType="dijit.form.FilteringSelect"
+ data-dojo-props="
+ name: 'overflowPolicy',
+ value: '',
+ searchAttr: 'name',
+ required: false,
+ promptMessage: 'Overflow policy override. If not default, messages arriving will have overflow policy setting overridden',
+ title: 'Enter overflow policy override'">
+ <option value="NONE">None</option>
+ <option value="RING">Ring</option>
+ </select>
+ </div>
+ </div>
+
+ <div id="formAddQueueOverflowPolicy:RING" class="hidden overflowPolicySpecificDiv">
+ <div class="clear">
+ <div class="formLabel-labelCell">Maximum count:</div>
+ <div class="formLabel-controlCell">
+ <input type="text" id="formAddQueueOverflowPolicy.maxCount"
+ data-dojo-type="dijit/form/ValidationTextBox"
+ data-dojo-props="
+ name: 'maxCount',
+ placeHolder: 'number of messages',
+ promptMessage: 'Maximum number of messages in the queue',
+ title: 'Enter the maximum number of messages in the queue',
+ trim: true"/>
+ </div>
+ <div class="formLabel-labelCell">Maximum size:</div>
+ <div class="formLabel-controlCell">
+ <input type="text" id="formAddQueueoverflowPolicy.maxSize"
+ data-dojo-type="dijit/form/ValidationTextBox"
+ data-dojo-props="
+ name: 'maxSize',
+ placeHolder: 'size of messages',
+ promptMessage: 'Maximum size of messages (including header) in the queue',
+ title: 'Enter the maximum size of messages (including header) in the queue',
+ trim: true"/>
+ </div>
+ </div>
+ <div class="clear"></div>
+ </div>
+
+ <div class="clear">
<div class="formLabel-labelCell">Maximum Ttl:</div>
<div class="formLabel-controlCell">
<input type="text" id="formAddQueue.maximumMessageTtl"
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js Thu Feb 23 17:10:29 2017
@@ -386,6 +386,8 @@ define(["dojo/_base/declare",
"exclusive",
"owner",
"lifetimePolicy",
+ "overflowPolicy",
+ "overflowPolicyQualifier",
"type",
"typeQualifier",
"alertRepeatGap",
@@ -404,6 +406,9 @@ define(["dojo/_base/declare",
"queueDepthMessages",
"queueDepthBytes",
"queueDepthBytesUnits",
+ "queueDepthMessagesIncludingHeader",
+ "queueDepthBytesIncludingHeader",
+ "queueDepthBytesUnitsIncludingHeader",
"unacknowledgedMessages",
"unacknowledgedBytes",
"unacknowledgedBytesUnits",
@@ -476,6 +481,11 @@ define(["dojo/_base/declare",
this.queueDepthBytes.innerHTML = "(" + bytesDepth.value;
this.queueDepthBytesUnits.innerHTML = bytesDepth.units + ")";
+ this.queueDepthMessagesIncludingHeader.innerHTML = entities.encode(String(this.queueData["queueDepthMessages"]));
+ bytesDepth = formatter.formatBytes(this.queueData["queueDepthBytesIncludingHeader"]);
+ this.queueDepthBytesIncludingHeader.innerHTML = "(" + bytesDepth.value;
+ this.queueDepthBytesUnitsIncludingHeader.innerHTML = bytesDepth.units + ")";
+
this.unacknowledgedMessages.innerHTML = entities.encode(String(this.queueData["unacknowledgedMessages"]));
bytesDepth = formatter.formatBytes(this.queueData["unacknowledgedBytes"]);
this.unacknowledgedBytes.innerHTML = "(" + bytesDepth.value;
@@ -492,6 +502,18 @@ define(["dojo/_base/declare",
+ ")");
}
+ this["overflowPolicy"].innerHTML = entities.encode(this.queueData["overflowPolicy"]);
+ if (this.queueData["overflowPolicy"] == "NONE")
+ {
+ this.overflowPolicyQualifier.style.display = "none";
+ }
+ else if (this.queueData["overflowPolicy"] == "RING")
+ {
+ bytesDepth = formatter.formatBytes(this.queueData["maxSize"]);
+ this.overflowPolicyQualifier.innerHTML = "(Max count: " + entities.encode(String(this.queueData["maxCount"]))
+ + ", Max Size: " + bytesDepth.value + " " + bytesDepth.units + ")";
+ }
+
if (this.queueData["messageGroupKey"])
{
this.messageGroupKey.innerHTML = entities.encode(String(this.queueData["messageGroupKey"]));
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js Thu Feb 23 17:10:29 2017
@@ -103,6 +103,27 @@ define(["dojo/dom",
}
});
+ var typeSelector = registry.byId("formAddQueue.overflowPolicy");
+ typeSelector.on("change", function (value)
+ {
+ query(".overflowPolicySpecificDiv")
+ .forEach(function (node, index, arr)
+ {
+ if (node.id === "formAddQueueOverflowPolicy:" + value)
+ {
+ node.style.display = "block";
+ if (addQueue.management)
+ {
+ util.applyMetadataToWidgets(node, "Queue", value, addQueue.management.metadata);
+ }
+ }
+ else
+ {
+ node.style.display = "none";
+ }
+ });
+ });
+
theForm.on("submit", function (e)
{
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/showQueue.html
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/showQueue.html?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/showQueue.html (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/showQueue.html Thu Feb 23 17:10:29 2017
@@ -48,6 +48,11 @@
<div class="formLabel-labelCell">Persist Messages:</div>
<div class="messageDurability formValue-valueCell"></div>
</div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Overflow policy:</div>
+ <div class="overflowPolicy formValue-valueCell"></div>
+ <div class="overflowPolicyQualifier formValue-valueCell"></div>
+ </div>
</div>
<div class="alignRight">
<div class="clear">
@@ -78,6 +83,15 @@
</div>
</div>
<div class="clear">
+ <div class="formLabel-labelCell">Size including header:</div>
+ <div class="formValue-valueCell">
+ <span class="queueDepthMessagesIncludingHeader"></span>
+ <span> msgs</span>
+ <span class="queueDepthBytesIncludingHeader">(</span>
+ <span class="queueDepthBytesUnitsIncludingHeader">)</span>
+ </div>
+ </div>
+ <div class="clear">
<div class="formLabel-labelCell">Pre-fetched:</div>
<div class="formValue-valueCell">
<span class="unacknowledgedMessages"></span>
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java Thu Feb 23 17:10:29 2017
@@ -38,6 +38,8 @@ import com.fasterxml.jackson.databind.Ob
import org.apache.qpid.server.common.AMQPFilterTypes;
import org.apache.qpid.server.exchange.ExchangeDefaults;
+import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.model.OverflowPolicy;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;
import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
@@ -70,7 +72,6 @@ import org.apache.qpid.server.session.AM
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.model.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost;
@@ -106,6 +107,7 @@ public class VirtualHostMessageStoreTest
private String durablePriorityQueueName = "MST-PriorityQueue-Durable";
private String durableLastValueQueueName = "MST-LastValueQueue-Durable";
private String durableQueueName = "MST-Queue-Durable";
+ private String durableQueueRingOverflowPolicy = "MST-Queue-Ring-OverflowPolicy";
private String priorityQueueName = "MST-PriorityQueue";
private String queueName = "MST-Queue";
@@ -235,7 +237,7 @@ public class VirtualHostMessageStoreTest
validateMessageOnTopics(2, true);
assertEquals("Not all queues correctly registered",
- 10, _virtualHost.getChildren(Queue.class).size());
+ 11, _virtualHost.getChildren(Queue.class).size());
}
public void testMessagePersistence() throws Exception
@@ -319,7 +321,7 @@ public class VirtualHostMessageStoreTest
public void testDurableQueueRemoval() throws Exception
{
//Register Durable Queue
- createQueue(durableQueueName, false, true, false, false);
+ createQueue(durableQueueName, false, true, false, false, false);
assertEquals("Incorrect number of queues registered before recovery",
1, _virtualHost.getChildren(Queue.class).size());
@@ -438,7 +440,7 @@ public class VirtualHostMessageStoreTest
//create durable queue and exchange, bind them
Exchange<?>
exch = createExchange(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, directExchangeName, true);
- createQueue(durableQueueName, false, true, false, false);
+ createQueue(durableQueueName, false, true, false, false, false);
bindQueueToExchange(exch, directRouting, _virtualHost.getChildByName(Queue.class, durableQueueName), false);
assertEquals("Incorrect number of bindings registered before recovery",
@@ -546,15 +548,16 @@ public class VirtualHostMessageStoreTest
private void validateDurableQueueProperties()
{
- validateQueueProperties(_virtualHost.getChildByName(Queue.class, durablePriorityQueueName), true, true, false, false);
- validateQueueProperties(_virtualHost.getChildByName(Queue.class, durablePriorityTopicQueueName), true, true, false, false);
- validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableQueueName), false, true, false, false);
- validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableTopicQueueName), false, true, false, false);
- validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableExclusiveQueueName), false, true, true, false);
- validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableLastValueQueueName), false, true, true, true);
+ validateQueueProperties(_virtualHost.getChildByName(Queue.class, durablePriorityQueueName), true, true, false, false, false);
+ validateQueueProperties(_virtualHost.getChildByName(Queue.class, durablePriorityTopicQueueName), true, true, false, false, false);
+ validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableQueueName), false, true, false, false, false);
+ validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableTopicQueueName), false, true, false, false, false);
+ validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableExclusiveQueueName), false, true, true, false, false);
+ validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableLastValueQueueName), false, true, true, true, false);
+ validateQueueProperties(_virtualHost.getChildByName(Queue.class, durableQueueRingOverflowPolicy), false, true, false, false, true);
}
- private void validateQueueProperties(Queue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
+ private void validateQueueProperties(Queue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue, boolean ringPolicy)
{
if(usePriority || lastValueQueue)
{
@@ -580,6 +583,7 @@ public class VirtualHostMessageStoreTest
assertEquals("Queue owner is not as expected for queue " + queue.getName(), exclusive ? queueOwner : null, queue.getOwner());
assertEquals("Queue durability is not as expected for queue " + queue.getName(), durable, queue.isDurable());
assertEquals("Queue exclusivity is not as expected for queue " + queue.getName(), exclusive, queue.getExclusive() != ExclusivityPolicy.NONE);
+ assertEquals("Queue overflow policy is not as expected for queue " + queue.getName(), ringPolicy, queue.getOverflowPolicy() == OverflowPolicy.RING);
}
/**
@@ -624,40 +628,43 @@ public class VirtualHostMessageStoreTest
private void createAllQueues() throws Exception
{
//Register Durable Priority Queue
- createQueue(durablePriorityQueueName, true, true, false, false);
+ createQueue(durablePriorityQueueName, true, true, false, false, false);
//Register Durable Simple Queue
- createQueue(durableQueueName, false, true, false, false);
+ createQueue(durableQueueName, false, true, false, false, false);
//Register Durable Exclusive Simple Queue
- createQueue(durableExclusiveQueueName, false, true, true, false);
+ createQueue(durableExclusiveQueueName, false, true, true, false, false);
//Register Durable LastValue Queue
- createQueue(durableLastValueQueueName, false, true, true, true);
+ createQueue(durableLastValueQueueName, false, true, true, true, false);
+
+ //Register Durable Queue with Ring Overflow Policy
+ createQueue(durableQueueRingOverflowPolicy, false, true, false, false, true);
//Register NON-Durable Priority Queue
- createQueue(priorityQueueName, true, false, false, false);
+ createQueue(priorityQueueName, true, false, false, false, false);
//Register NON-Durable Simple Queue
- createQueue(queueName, false, false, false, false);
+ createQueue(queueName, false, false, false, false, false);
}
private void createAllTopicQueues() throws Exception
{
//Register Durable Priority Queue
- createQueue(durablePriorityTopicQueueName, true, true, false, false);
+ createQueue(durablePriorityTopicQueueName, true, true, false, false, false);
//Register Durable Simple Queue
- createQueue(durableTopicQueueName, false, true, false, false);
+ createQueue(durableTopicQueueName, false, true, false, false, false);
//Register NON-Durable Priority Queue
- createQueue(priorityTopicQueueName, true, false, false, false);
+ createQueue(priorityTopicQueueName, true, false, false, false, false);
//Register NON-Durable Simple Queue
- createQueue(topicQueueName, false, false, false, false);
+ createQueue(topicQueueName, false, false, false, false, false);
}
- private void createQueue(String queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
+ private void createQueue(String queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue, boolean ringPolicy)
throws Exception
{
@@ -703,7 +710,7 @@ public class VirtualHostMessageStoreTest
});
- validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue);
+ validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue, ringPolicy);
}
private Map<String, Exchange<?>> createExchanges() throws Exception
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java Thu Feb 23 17:10:29 2017
@@ -88,7 +88,7 @@ public class QueuePolicyTest extends Qpi
}
/**
- * Test Goal : To create a ring queue programitcally using the address string and observe
+ * Test Goal : To create a ring queue programmatically using the address string and observe
* that it works as expected.
*/
public void testRingPolicy() throws Exception
Modified: qpid/java/trunk/test-profiles/Java010Excludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java010Excludes?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java010Excludes (original)
+++ qpid/java/trunk/test-profiles/Java010Excludes Thu Feb 23 17:10:29 2017
@@ -40,10 +40,7 @@ org.apache.qpid.test.unit.close.JavaServ
//QPID-1864: rollback with subscriptions does not work in 0-10 yet
org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage
-//QPID-3422: test fails because ring queue is not implemented on Broker for Java
-org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testBrowseMode
-
-// QPID-3133: On 0-10, the exception listener is currently not invoked when reconnection fails to occurs.
+// QPID-3133: On 0-10, the exception listener is currently not invoked when reconnection fails to occurs.
org.apache.qpid.server.failover.FailoverMethodTest#*
// QPID-3392: the Broker for Java does not yet implement exchange creation arguments
Modified: qpid/java/trunk/test-profiles/JavaExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/JavaExcludes?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/JavaExcludes (original)
+++ qpid/java/trunk/test-profiles/JavaExcludes Thu Feb 23 17:10:29 2017
@@ -20,8 +20,7 @@
// QPID-1823: this takes ages to run
org.apache.qpid.client.SessionCreateTest#*
-//QPID-2845: The queue policy types used by the C++ broker are not currently supported by the Apache Qpid Broker for Java
-org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy
+//QPID-2845: The queue reject policy type used by the C++ broker is not currently supported by the Apache Qpid Broker for Java
org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy
// Test runs for 2 minutes testing that subtraction works
Modified: qpid/java/trunk/test-profiles/python_tests/Java010PythonExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/python_tests/Java010PythonExcludes?rev=1784177&r1=1784176&r2=1784177&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/python_tests/Java010PythonExcludes (original)
+++ qpid/java/trunk/test-profiles/python_tests/Java010PythonExcludes Thu Feb 23 17:10:29 2017
@@ -49,12 +49,9 @@ qpid_tests.broker_0_10.extensions.Extens
#The broker does not support the timed-autodelete extension
qpid_tests.broker_0_10.extensions.ExtensionTests.test_timed_autodelete
-#The broker does not support ring queues, fairshare, or the priority alias
-qpid_tests.broker_0_10.priority.PriorityTests.test_ring_queue*
+#The broker does not support fairshare, or the priority alias
qpid_tests.broker_0_10.priority.PriorityTests.test_fairshare*
qpid_tests.broker_0_10.priority.PriorityTests.test_prioritised_delivery_with_alias
-#QPID-6299 broker does not support ring queue on lvq
-qpid_tests.broker_0_10.lvq.LVQTests.test_ring_lvq2
#QPID-6572 These tests pass a empty binding key argument, which won't match with the Broker for Java's stricter impl.
qpid_tests.broker_0_10.exchange.AutodeleteTests.testAutodeleteFanout
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org