You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/07/10 13:15:05 UTC
[2/2] qpid-broker-j git commit: QPID-7815: Add support for reject
policy
QPID-7815: Add support for reject policy
The patch is based on work implemented by Tomas Vavricka
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c5e340f0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c5e340f0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c5e340f0
Branch: refs/heads/master
Commit: c5e340f0e85dd30947bc4270d14991036c95cbac
Parents: 8468aa0
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Jul 10 14:12:51 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Mon Jul 10 14:14:42 2017 +0100
----------------------------------------------------------------------
.../berkeleydb/AbstractBDBMessageStore.java | 22 ++-
.../apache/qpid/server/message/RejectType.java | 28 ++++
.../qpid/server/message/RoutingResult.java | 86 ++++++-----
.../qpid/server/model/OverflowPolicy.java | 3 +-
.../org/apache/qpid/server/model/Queue.java | 20 ++-
.../apache/qpid/server/queue/AbstractQueue.java | 115 +++++++++-----
.../server/queue/CopyMessagesTransaction.java | 25 ++--
.../server/queue/DeleteMessagesTransaction.java | 3 +-
.../FlowToDiskOverflowPolicyHandlerFactory.java | 42 ------
.../queue/MessageUnacceptableException.java | 29 ++++
.../server/queue/MoveMessagesTransaction.java | 29 ++--
.../server/queue/NoneOverflowPolicyHandler.java | 3 +-
.../queue/NoneOverflowPolicyHandlerFactory.java | 43 ------
.../server/queue/OverflowPolicyHandler.java | 3 +-
.../queue/OverflowPolicyHandlerFactory.java | 29 ----
...FlowControlOverflowPolicyHandlerFactory.java | 42 ------
.../server/queue/QueueEntryTransaction.java | 8 +-
.../QueueSizeLimitRespectingTransaction.java | 77 ++++++++++
.../qpid/server/queue/RejectPolicyHandler.java | 98 ++++++++++++
.../queue/RingOverflowPolicyHandlerFactory.java | 43 ------
.../qpid/server/store/MemoryMessageStore.java | 22 +++
.../apache/qpid/server/store/MessageStore.java | 9 ++
.../qpid/server/store/NullMessageStore.java | 12 ++
.../server/queue/AbstractQueueTestBase.java | 65 +++++++-
.../server/queue/RejectPolicyHandlerTest.java | 142 ++++++++++++++++++
.../qpid/server/store/MessageStoreTestCase.java | 15 +-
.../server/protocol/v0_10/ServerSession.java | 10 +-
.../protocol/v0_10/ServerSessionDelegate.java | 32 +++-
.../qpid/server/protocol/v0_8/AMQChannel.java | 134 +++++++----------
.../protocol/v1_0/ExchangeDestination.java | 47 ++++--
.../protocol/v1_0/NodeReceivingDestination.java | 47 ++++--
.../store/jdbc/AbstractJDBCMessageStore.java | 20 +++
.../qpid/test/client/queue/QueuePolicyTest.java | 148 +++++++++++--------
test-profiles/Java10Excludes | 1 -
test-profiles/JavaExcludes | 3 -
test-profiles/JavaPre010Excludes | 2 -
36 files changed, 942 insertions(+), 515 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 7260557..e6262a6 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -60,7 +60,6 @@ import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.txn.Xid;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
import org.apache.qpid.server.store.berkeleydb.tuple.ByteBufferBinding;
@@ -71,6 +70,7 @@ import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.txn.Xid;
import org.apache.qpid.server.util.CachingUUIDFactory;
@@ -107,6 +107,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
private final AtomicLong _inMemorySize = new AtomicLong();
private final AtomicLong _bytesEvacuatedFromMemory = new AtomicLong();
private final Set<StoredBDBMessage<?>> _messages = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final Set<MessageDeleteListener> _messageDeleteListeners = Collections.newSetFromMap(new ConcurrentHashMap<>());
@Override
public void upgradeStoreStructure() throws StoreException
@@ -1229,6 +1230,13 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
_messageDataRef = null;
_inMemorySize.addAndGet(-bytesCleared);
+ if (!_messageDeleteListeners.isEmpty())
+ {
+ for (final MessageDeleteListener messageDeleteListener : _messageDeleteListeners)
+ {
+ messageDeleteListener.messageDeleted(this);
+ }
+ }
}
@Override
@@ -1406,6 +1414,18 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
+ @Override
+ public void addMessageDeleteListener(final MessageDeleteListener listener)
+ {
+ _messageDeleteListeners.add(listener);
+ }
+
+ @Override
+ public void removeMessageDeleteListener(final MessageDeleteListener listener)
+ {
+ _messageDeleteListeners.remove(listener);
+ }
+
private static class BDBStoredXidRecord implements org.apache.qpid.server.store.Transaction.StoredXidRecord
{
private final long _format;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/message/RejectType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/RejectType.java b/broker-core/src/main/java/org/apache/qpid/server/message/RejectType.java
new file mode 100644
index 0000000..85c458b
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/RejectType.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+public enum RejectType
+{
+ LIMIT_EXCEEDED,
+ ALREADY_ENQUEUED,
+ PRECONDITION_FAILED
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java b/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
index 29c31b3..55c63e6 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
@@ -43,7 +43,7 @@ public class RoutingResult<M extends ServerMessage<? extends StorableMessageMeta
private final M _message;
private final Set<BaseQueue> _queues = new HashSet<>();
- private final Map<BaseQueue, CharSequence> _notAcceptingRoutableQueues = new HashMap<>();
+ private final Map<BaseQueue, RejectReason> _rejectingRoutableQueues = new HashMap<>();
public RoutingResult(final M message)
{
@@ -86,11 +86,11 @@ public class RoutingResult<M extends ServerMessage<? extends StorableMessageMeta
public void add(RoutingResult<M> result)
{
addQueues(result._queues);
- for (Map.Entry<BaseQueue, CharSequence> e : result._notAcceptingRoutableQueues.entrySet())
+ for (Map.Entry<BaseQueue, RejectReason> e : result._rejectingRoutableQueues.entrySet())
{
if (!e.getKey().isDeleted())
{
- _notAcceptingRoutableQueues.put(e.getKey(), e.getValue());
+ _rejectingRoutableQueues.put(e.getKey(), e.getValue());
}
}
}
@@ -98,31 +98,12 @@ public class RoutingResult<M extends ServerMessage<? extends StorableMessageMeta
public int send(ServerTransaction txn,
final Action<? super MessageInstance> postEnqueueAction)
{
- for(BaseQueue q : _queues)
+ if (containsReject(RejectType.LIMIT_EXCEEDED, RejectType.PRECONDITION_FAILED))
{
- if(!_message.isResourceAcceptable(q))
- {
- return 0;
- }
+ return 0;
}
- final BaseQueue[] baseQueues;
- if(_message.isReferenced())
- {
- ArrayList<BaseQueue> uniqueQueues = new ArrayList<>(_queues.size());
- for(BaseQueue q : _queues)
- {
- if(!_message.isReferenced(q))
- {
- uniqueQueues.add(q);
- }
- }
- baseQueues = uniqueQueues.toArray(new BaseQueue[uniqueQueues.size()]);
- }
- else
- {
- baseQueues = _queues.toArray(new BaseQueue[_queues.size()]);
- }
+ final BaseQueue[] queues = _queues.toArray(new BaseQueue[_queues.size()]);
txn.enqueue(_queues, _message, new ServerTransaction.EnqueueAction()
{
MessageReference _reference = _message.newReference();
@@ -131,9 +112,9 @@ public class RoutingResult<M extends ServerMessage<? extends StorableMessageMeta
{
try
{
- for(int i = 0; i < baseQueues.length; i++)
+ for(int i = 0; i < queues.length; i++)
{
- baseQueues[i].enqueue(_message, postEnqueueAction, records[i]);
+ queues[i].enqueue(_message, postEnqueueAction, records[i]);
}
}
finally
@@ -155,27 +136,64 @@ public class RoutingResult<M extends ServerMessage<? extends StorableMessageMeta
return !_queues.isEmpty();
}
- public void addNotAcceptingRoutableQueue(BaseQueue q, CharSequence reason)
+ public void addRejectReason(BaseQueue q, final RejectType rejectType, String reason)
+ {
+ _rejectingRoutableQueues.put(q, new RejectReason(rejectType, reason));
+ }
+
+ public boolean isRejected()
{
- _notAcceptingRoutableQueues.put(q, reason);
+ return !_rejectingRoutableQueues.isEmpty();
}
- public boolean hasNotAcceptingRoutableQueue()
+ public boolean containsReject(RejectType... type)
{
- return !_notAcceptingRoutableQueues.isEmpty();
+ for(RejectReason reason: _rejectingRoutableQueues.values())
+ {
+ for(RejectType t: type)
+ {
+ if (reason.getRejectType() == t)
+ {
+ return true;
+ }
+ }
+ }
+ return false;
}
- public String getUnacceptanceCause()
+ public String getRejectReason()
{
StringBuilder refusalMessages = new StringBuilder();
- for (CharSequence message : _notAcceptingRoutableQueues.values())
+ for (RejectReason reason : _rejectingRoutableQueues.values())
{
if (refusalMessages.length() > 0)
{
refusalMessages.append(";");
}
- refusalMessages.append(message);
+ refusalMessages.append(reason.getReason());
}
return refusalMessages.toString();
}
+
+ private static class RejectReason
+ {
+ private final RejectType _rejectType;
+ private final String _reason;
+
+ private RejectReason(final RejectType rejectType, final String reason)
+ {
+ _rejectType = rejectType;
+ _reason = reason;
+ }
+
+ private RejectType getRejectType()
+ {
+ return _rejectType;
+ }
+
+ public String getReason()
+ {
+ return _reason;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java b/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
index 57f334d..a742b78 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
@@ -25,5 +25,6 @@ public enum OverflowPolicy
NONE,
RING,
PRODUCER_FLOW_CONTROL,
- FLOW_TO_DISK
+ FLOW_TO_DISK,
+ REJECT
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 148291c..ae9bdf7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -275,17 +275,15 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
@ManagedAttribute(defaultValue = "${queue.defaultOverflowPolicy}",
description = "Queue overflow policy."
- + " Current options are ProducerFlowControl, Ring, FlowToDisk, and None."
- + " ProducerFlowControl overflow policy - when queue message number or size of messages"
- + " in queue exceeds maximum, the producing sessions are blocked until queue depth falls"
- + " below the resume threshold."
- + " Ring overflow policy - when queue message number or size of messages in queue exceeds"
- + " maximum, oldest messages are discarded."
- + " FlowToDisk overflow policy - when queue message number or size of messages"
- + " in queue exceeds maximum, new incoming messages are written to disk and immediately"
- + " evicted from memory."
- + " None overflow policy - queue capacity is unbounded, the attributes defining the limits for"
- + " maximum message number and maximum number of bytes are not applied.",
+ + " Options are ProducerFlowControl, Ring, FlowToDisk, Reject, and None."
+ + " The policy comes into effect where queue limits described by maximumQueueDepthBytes"
+ + " and/or maximumQueueDepthMessage are breached."
+ + " ProducerFlowControl - the producing sessions are blocked until queue size"
+ + " falls beneath resume threshold (see context variable queue.queueFlowResumeLimit)."
+ + " Ring - oldest messages are discarded."
+ + " Reject - incoming messages are rejected."
+ + " FlowToDisk - new incoming messages are written to disk and immediately evicted from memory."
+ + " None - queue capacity is unbounded.",
mandatory = true)
OverflowPolicy getOverflowPolicy();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 6129402..bd2f6a3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -92,6 +92,7 @@ import org.apache.qpid.server.message.MessageInfoImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSender;
+import org.apache.qpid.server.message.RejectType;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
@@ -108,6 +109,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.transport.AMQPConnection;
@@ -240,7 +242,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@ManagedAttributeField
private volatile boolean _holdOnPublishEnabled;
- @ManagedAttributeField
+ @ManagedAttributeField(afterSet = "postSetOverflowPolicy")
private OverflowPolicy _overflowPolicy;
@ManagedAttributeField
private long _maximumQueueDepthMessages;
@@ -264,7 +266,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
private AdvanceConsumersTask _queueHouseKeepingTask;
private volatile int _bindingCount;
- private volatile OverflowPolicyHandler _overflowPolicyHandler;
+ private volatile RejectPolicyHandler _rejectPolicyHandler;
+ private volatile OverflowPolicyHandler _postEnqueueOverflowPolicyHandler;
private long _flowToDiskThreshold;
private volatile MessageDestination _alternateBindingDestination;
@@ -358,8 +361,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
_logSubject = new QueueLogSubject(this);
- _overflowPolicyHandler = createOverflowPolicyHandler(getOverflowPolicy());
-
_queueHouseKeepingTask = new AdvanceConsumersTask();
Subject activeSubject = Subject.getSubject(AccessController.getContext());
Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
@@ -576,6 +577,48 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
updateAlertChecks();
}
+ private void createOverflowPolicyHandler(final OverflowPolicy overflowPolicy)
+ {
+ MessageStore messageStore = getVirtualHost().getMessageStore();
+
+ if (overflowPolicy == OverflowPolicy.REJECT)
+ {
+ _postEnqueueOverflowPolicyHandler = new NoneOverflowPolicyHandler();
+ _rejectPolicyHandler = new RejectPolicyHandler(this);
+ messageStore.addMessageDeleteListener(_rejectPolicyHandler);
+ }
+ else
+ {
+ if (_rejectPolicyHandler != null)
+ {
+ messageStore.removeMessageDeleteListener(_rejectPolicyHandler);
+ _rejectPolicyHandler = null;
+ }
+
+ OverflowPolicyHandler overflowPolicyHandler;
+ switch (overflowPolicy)
+ {
+ case RING:
+ overflowPolicyHandler = new RingOverflowPolicyHandler(this, getEventLogger());
+ break;
+ case PRODUCER_FLOW_CONTROL:
+ overflowPolicyHandler = new ProducerFlowControlOverflowPolicyHandler(this, getEventLogger());
+ break;
+ case FLOW_TO_DISK:
+ overflowPolicyHandler = new FlowToDiskOverflowPolicyHandler(this);
+ break;
+ case NONE:
+ overflowPolicyHandler = new NoneOverflowPolicyHandler();
+ break;
+ default:
+ throw new IllegalStateException(String.format("Overflow policy '%s' is not implemented",
+ overflowPolicy.name()));
+ }
+
+ _postEnqueueOverflowPolicyHandler = overflowPolicyHandler;
+ }
+ }
+
protected LogMessage getCreatedLogMessage()
{
String ownerString = getOwner();
@@ -1251,7 +1294,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
action.performAction(entry);
}
- _overflowPolicyHandler.checkOverflow(entry);
+
+ RejectPolicyHandler rejectPolicyHandler = _rejectPolicyHandler;
+ if (rejectPolicyHandler != null)
+ {
+ rejectPolicyHandler.postEnqueue(entry);
+ }
+ _postEnqueueOverflowPolicyHandler.checkOverflow(entry);
}
}
@@ -1841,7 +1890,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@Override
public void checkCapacity()
{
- _overflowPolicyHandler.checkOverflow(null);
+ _postEnqueueOverflowPolicyHandler.checkOverflow(null);
}
void notifyConsumers(QueueEntry entry)
@@ -2609,15 +2658,31 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
RoutingResult<M> result = new RoutingResult<>(message);
if (!message.isResourceAcceptable(this))
{
- result.addNotAcceptingRoutableQueue(this, String.format("Not accepted by queue '%s'", getName()));
+ result.addRejectReason(this,
+ RejectType.PRECONDITION_FAILED,
+ String.format("Not accepted by queue '%s'", getName()));
}
else if (message.isReferenced(this))
{
- result.addNotAcceptingRoutableQueue(this, String.format("Already enqueued on queue '%s'", getName()));
+ result.addRejectReason(this,
+ RejectType.ALREADY_ENQUEUED,
+ String.format("Already enqueued on queue '%s'", getName()));
}
else
{
- result.addQueue(this);
+ try
+ {
+ RejectPolicyHandler rejectPolicyHandler = _rejectPolicyHandler;
+ if (rejectPolicyHandler != null)
+ {
+ rejectPolicyHandler.checkReject(message);
+ }
+ result.addQueue(this);
+ }
+ catch (MessageUnacceptableException e)
+ {
+ result.addRejectReason(this, RejectType.LIMIT_EXCEEDED, e.getMessage());
+ }
}
return result;
}
@@ -2946,9 +3011,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@Override
public boolean isQueueFlowStopped()
{
- if (_overflowPolicyHandler instanceof ProducerFlowControlOverflowPolicyHandler)
+ if (_postEnqueueOverflowPolicyHandler instanceof ProducerFlowControlOverflowPolicyHandler)
{
- return ((ProducerFlowControlOverflowPolicyHandler)_overflowPolicyHandler).isQueueFlowStopped();
+ return ((ProducerFlowControlOverflowPolicyHandler) _postEnqueueOverflowPolicyHandler).isQueueFlowStopped();
}
return false;
}
@@ -2995,31 +3060,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
- @Override
- protected void changeAttributes(Map<String,Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
- {
- OverflowPolicy oldOverflowPolicy = getOverflowPolicy();
- super.changeAttributes(attributes);
-
- OverflowPolicy newOverflowPolicy = getOverflowPolicy();
- if (oldOverflowPolicy != newOverflowPolicy)
- {
- _overflowPolicyHandler = createOverflowPolicyHandler(newOverflowPolicy);
- _overflowPolicyHandler.checkOverflow(null);
- }
- }
-
- private OverflowPolicyHandler createOverflowPolicyHandler(final OverflowPolicy overflowPolicy)
+ @SuppressWarnings("ignore")
+ private void postSetOverflowPolicy()
{
- OverflowPolicyHandlerFactory factory =
- new QpidServiceLoader().getInstancesByType(OverflowPolicyHandlerFactory.class)
- .get(String.valueOf(overflowPolicy));
- if (factory == null)
- {
- throw new IllegalStateException(String.format("Factory for overflow policy '%s' is not found",
- overflowPolicy.name()));
- }
- return factory.create(this, getEventLogger());
+ createOverflowPolicyHandler(getOverflowPolicy());
+ _postEnqueueOverflowPolicyHandler.checkOverflow(null);
}
private static final String[] NON_NEGATIVE_NUMBERS = {
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
index cf073f5..fb7ec30 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
@@ -23,30 +23,25 @@ package org.apache.qpid.server.queue;
import java.util.List;
import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
-public class CopyMessagesTransaction extends QueueEntryTransaction
+class CopyMessagesTransaction extends QueueSizeLimitRespectingTransaction
{
- private final Queue _destinationQueue;
- public CopyMessagesTransaction(Queue sourceQueue,
- List<Long> messageIds,
- Queue destinationQueue,
- final MessageFilter filter, final int limit)
+ CopyMessagesTransaction(Queue sourceQueue,
+ List<Long> messageIds,
+ Queue destinationQueue,
+ final MessageFilter filter, final int limit)
{
- super(sourceQueue, messageIds, filter, limit);
- _destinationQueue = destinationQueue;
+ super(sourceQueue, messageIds, destinationQueue, filter, limit);
}
@Override
- protected void updateEntry(QueueEntry entry, QueueManagingVirtualHost.Transaction txn)
+ void performOperation(final QueueEntry entry,
+ final QueueManagingVirtualHost.Transaction txn,
+ final Queue destinationQueue)
{
- ServerMessage msg = entry.getMessage();
- if(msg != null && !msg.isReferenced(_destinationQueue))
- {
- txn.copy(entry, _destinationQueue);
- }
+ txn.copy(entry, destinationQueue);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
index 84aa0ed..1357ce7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
@@ -37,8 +37,9 @@ public class DeleteMessagesTransaction extends QueueEntryTransaction
}
@Override
- protected void updateEntry(QueueEntry entry, QueueManagingVirtualHost.Transaction txn)
+ protected boolean updateEntry(QueueEntry entry, QueueManagingVirtualHost.Transaction txn)
{
txn.dequeue(entry);
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerFactory.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerFactory.java b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerFactory.java
deleted file mode 100644
index abcf98d..0000000
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.model.OverflowPolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.plugin.PluggableService;
-
-@SuppressWarnings("unused")
-@PluggableService
-public class FlowToDiskOverflowPolicyHandlerFactory implements OverflowPolicyHandlerFactory
-{
- @Override
- public String getType()
- {
- return OverflowPolicy.FLOW_TO_DISK.name();
- }
-
- @Override
- public OverflowPolicyHandler create(final Queue<?> queue, final EventLogger eventLogger)
- {
- return new FlowToDiskOverflowPolicyHandler(queue);
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/MessageUnacceptableException.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/MessageUnacceptableException.java b/broker-core/src/main/java/org/apache/qpid/server/queue/MessageUnacceptableException.java
new file mode 100644
index 0000000..48579c6
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/MessageUnacceptableException.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public class MessageUnacceptableException extends Exception
+{
+ public MessageUnacceptableException(final String message)
+ {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
index 7e5ba68..4e42c08 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
@@ -23,30 +23,25 @@ package org.apache.qpid.server.queue;
import java.util.List;
import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
-public class MoveMessagesTransaction extends QueueEntryTransaction
+public class MoveMessagesTransaction extends QueueSizeLimitRespectingTransaction
{
- private final Queue _destinationQueue;
-
- public MoveMessagesTransaction(Queue sourceQueue,
- List<Long> messageIds,
- Queue destinationQueue,
- final MessageFilter filter, final int limit)
+ MoveMessagesTransaction(Queue sourceQueue,
+ List<Long> messageIds,
+ Queue destinationQueue,
+ final MessageFilter filter,
+ final int limit)
{
- super(sourceQueue, messageIds, filter, limit);
- _destinationQueue = destinationQueue;
+ super(sourceQueue, messageIds, destinationQueue, filter, limit);
}
- @Override
- protected void updateEntry(QueueEntry entry, QueueManagingVirtualHost.Transaction txn)
+
+ void performOperation(final QueueEntry entry,
+ final QueueManagingVirtualHost.Transaction txn,
+ final Queue destinationQueue)
{
- ServerMessage msg = entry.getMessage();
- if(msg != null && !msg.isReferenced(_destinationQueue))
- {
- txn.move(entry, _destinationQueue);
- }
+ txn.move(entry, destinationQueue);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java
index 61314b4..1d7ed77 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java
@@ -21,10 +21,9 @@ package org.apache.qpid.server.queue;
public class NoneOverflowPolicyHandler implements OverflowPolicyHandler
{
+
@Override
public void checkOverflow(final QueueEntry newlyEnqueued)
{
- // noop
}
-
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandlerFactory.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandlerFactory.java b/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandlerFactory.java
deleted file mode 100644
index f11da7d..0000000
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandlerFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.model.OverflowPolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.plugin.PluggableService;
-
-@PluggableService
-public class NoneOverflowPolicyHandlerFactory implements OverflowPolicyHandlerFactory
-{
-
- @Override
- public String getType()
- {
- return OverflowPolicy.NONE.name();
- }
-
- @Override
- public OverflowPolicyHandler create(final Queue<?> queue,
- final EventLogger eventLogger)
- {
- return new NoneOverflowPolicyHandler();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java
index 8c221b3..d396985 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java
@@ -18,10 +18,9 @@
* under the License.
*
*/
-
package org.apache.qpid.server.queue;
public interface OverflowPolicyHandler
{
- void checkOverflow(final QueueEntry newlyEnqueued);
+ void checkOverflow(QueueEntry newlyEnqueued);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandlerFactory.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandlerFactory.java b/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandlerFactory.java
deleted file mode 100644
index bb416cf..0000000
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandlerFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.plugin.Pluggable;
-
-public interface OverflowPolicyHandlerFactory extends Pluggable
-{
- OverflowPolicyHandler create(Queue<?> queue, final EventLogger eventLogger);
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerFactory.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerFactory.java b/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerFactory.java
deleted file mode 100644
index 3219683..0000000
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.model.OverflowPolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.plugin.PluggableService;
-
-@PluggableService
-public class ProducerFlowControlOverflowPolicyHandlerFactory implements OverflowPolicyHandlerFactory
-{
- @Override
- public String getType()
- {
- return OverflowPolicy.PRODUCER_FLOW_CONTROL.name();
- }
-
- @Override
- public OverflowPolicyHandler create(final Queue<?> queue,
- final EventLogger eventLogger)
- {
- return new ProducerFlowControlOverflowPolicyHandler(queue, eventLogger);
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
index 26de745..ddeee1f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
@@ -55,13 +55,14 @@ abstract class QueueEntryTransaction implements QueueManagingVirtualHost.Transac
public boolean visit(final QueueEntry entry)
{
final ServerMessage message = entry.getMessage();
+ boolean stop = false;
if (message != null)
{
final long messageId = message.getMessageNumber();
if ((_messageIds == null || _messageIds.remove(messageId))
&& (_filter == null || _filter.matches(entry.asFilterable())))
{
- updateEntry(entry, txn);
+ stop = updateEntry(entry, txn);
_modifiedMessageIds.add(messageId);
if (_limit > 0)
{
@@ -69,15 +70,14 @@ abstract class QueueEntryTransaction implements QueueManagingVirtualHost.Transac
}
}
}
- return _limit == 0 || (_messageIds != null && _messageIds.isEmpty());
+ return stop || _limit == 0 || (_messageIds != null && _messageIds.isEmpty());
}
});
}
}
-
- protected abstract void updateEntry(QueueEntry entry, QueueManagingVirtualHost.Transaction txn);
+ protected abstract boolean updateEntry(QueueEntry entry, QueueManagingVirtualHost.Transaction txn);
@Override
public final List<Long> getModifiedMessageIds()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSizeLimitRespectingTransaction.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSizeLimitRespectingTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSizeLimitRespectingTransaction.java
new file mode 100644
index 0000000..f361466
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueSizeLimitRespectingTransaction.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import java.util.List;
+
+import org.apache.qpid.server.filter.MessageFilter;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+
+abstract class QueueSizeLimitRespectingTransaction extends QueueEntryTransaction
+{
+ private final Queue _destinationQueue;
+ private long _pendingQueueDepthBytes;
+ private long _pendingQueueDepthMessages;
+
+ QueueSizeLimitRespectingTransaction(Queue sourceQueue,
+ List<Long> messageIds,
+ Queue destinationQueue,
+ final MessageFilter filter,
+ final int limit)
+ {
+ super(sourceQueue, messageIds, filter, limit);
+ _destinationQueue = destinationQueue;
+ }
+
+ @Override
+ protected boolean updateEntry(QueueEntry entry, QueueManagingVirtualHost.Transaction txn)
+ {
+ ServerMessage message = entry.getMessage();
+ _pendingQueueDepthMessages++;
+ _pendingQueueDepthBytes += message == null ? 0 : message.getSizeIncludingHeader();
+ boolean underfull = isUnderfull();
+ if (message != null && !message.isReferenced(_destinationQueue) && underfull)
+ {
+ performOperation(entry, txn, _destinationQueue);
+ }
+
+ return !underfull;
+ }
+
+ abstract void performOperation(final QueueEntry entry,
+ final QueueManagingVirtualHost.Transaction txn,
+ final Queue destinationQueue);
+
+ private boolean isUnderfull()
+ {
+ return _destinationQueue.getOverflowPolicy() == OverflowPolicy.NONE ||
+ ((_destinationQueue.getMaximumQueueDepthBytes() < 0
+ || _destinationQueue.getQueueDepthBytes() + _pendingQueueDepthBytes
+ <= _destinationQueue.getMaximumQueueDepthBytes())
+ && (_destinationQueue.getMaximumQueueDepthMessages() < 0
+ || _destinationQueue.getQueueDepthMessages() + _pendingQueueDepthMessages
+ <= _destinationQueue.getMaximumQueueDepthMessages()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/RejectPolicyHandler.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/RejectPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/RejectPolicyHandler.java
new file mode 100644
index 0000000..528133a
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/RejectPolicyHandler.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
+
+
+public class RejectPolicyHandler implements MessageStore.MessageDeleteListener
+{
+ private final Queue<?> _queue;
+ private final AtomicLong _pendingDepthBytes = new AtomicLong();
+ private final AtomicInteger _pendingDepthMessages = new AtomicInteger();
+ private final Map<StoredMessage<?>, Long> _pendingMessages = new ConcurrentHashMap<>();
+
+ RejectPolicyHandler(final Queue<?> queue)
+ {
+ _queue = queue;
+ }
+
+ @Override
+ public void messageDeleted(final StoredMessage<?> m)
+ {
+ decrementPendingCountersIfNecessary(m);
+ }
+
+ void checkReject(final ServerMessage<?> newMessage) throws MessageUnacceptableException
+ {
+ final long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages();
+ final long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
+ final int queueDepthMessages = _queue.getQueueDepthMessages();
+ final long queueDepthBytes = _queue.getQueueDepthBytes();
+
+ int pendingMessages = _pendingDepthMessages.addAndGet(1);
+ long pendingBytes = _pendingDepthBytes.addAndGet(newMessage.getSizeIncludingHeader());
+
+ boolean messagesOverflow = maximumQueueDepthMessages >= 0
+ && queueDepthMessages + pendingMessages > maximumQueueDepthMessages;
+ boolean bytesOverflow = maximumQueueDepthBytes >= 0
+ && queueDepthBytes + pendingBytes > maximumQueueDepthBytes;
+ if (bytesOverflow || messagesOverflow)
+ {
+ final long depthBytesDelta = -newMessage.getSizeIncludingHeader();
+ _pendingDepthBytes.addAndGet(-depthBytesDelta);
+ _pendingDepthMessages.addAndGet(-1);
+ final String message = String.format(
+ "Maximum depth exceeded on '%s' : current=[count: %d, size: %d], max=[count: %d, size: %d]",
+ _queue.getName(),
+ queueDepthMessages + pendingMessages,
+ queueDepthBytes + pendingBytes,
+ maximumQueueDepthMessages,
+ maximumQueueDepthBytes);
+ throw new MessageUnacceptableException(message);
+ }
+
+ _pendingMessages.put(newMessage.getStoredMessage(), newMessage.getSizeIncludingHeader());
+ }
+
+ void postEnqueue(MessageInstance instance)
+ {
+ decrementPendingCountersIfNecessary(instance.getMessage().getStoredMessage());
+ }
+
+ private void decrementPendingCountersIfNecessary(final StoredMessage<?> m)
+ {
+ Long size;
+ if ((size = _pendingMessages.remove(m)) != null)
+ {
+ _pendingDepthBytes.addAndGet(-size);
+ _pendingDepthMessages.addAndGet(-1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerFactory.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerFactory.java b/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerFactory.java
deleted file mode 100644
index 89a7b6b..0000000
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.model.OverflowPolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.plugin.PluggableService;
-
-@PluggableService
-public class RingOverflowPolicyHandlerFactory implements OverflowPolicyHandlerFactory
-{
-
- @Override
- public String getType()
- {
- return OverflowPolicy.RING.name();
- }
-
- @Override
- public OverflowPolicyHandler create(final Queue<?> queue,
- final EventLogger eventLogger)
- {
- return new RingOverflowPolicyHandler(queue, eventLogger);
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index d9992ec..86342fa 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store;
import java.io.File;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -53,6 +54,8 @@ public class MemoryMessageStore implements MessageStore
private final Map<UUID, Set<Long>> _messageInstances = new HashMap<UUID, Set<Long>>();
private final Map<Xid, DistributedTransactionRecords> _distributedTransactions = new HashMap<Xid, DistributedTransactionRecords>();
private final AtomicLong _inMemorySize = new AtomicLong();
+ private final Set<MessageDeleteListener> _messageDeleteListeners = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
private final class MemoryMessageStoreTransaction implements Transaction
@@ -282,6 +285,18 @@ public class MemoryMessageStore implements MessageStore
}
@Override
+ public void addMessageDeleteListener(final MessageDeleteListener listener)
+ {
+ _messageDeleteListeners.add(listener);
+ }
+
+ @Override
+ public void removeMessageDeleteListener(final MessageDeleteListener listener)
+ {
+ _messageDeleteListeners.remove(listener);
+ }
+
+ @Override
public <T extends StorableMessageMetaData> MessageHandle<T> addMessage(final T metaData)
{
long id = getNextMessageId();
@@ -304,6 +319,13 @@ public class MemoryMessageStore implements MessageStore
int bytesCleared = metaData.getStorableSize() + metaData.getContentSize();
super.remove();
_inMemorySize.addAndGet(-bytesCleared);
+ if (!_messageDeleteListeners.isEmpty())
+ {
+ for (final MessageDeleteListener messageDeleteListener : _messageDeleteListeners)
+ {
+ messageDeleteListener.messageDeleted(this);
+ }
+ }
}
};
_inMemorySize.addAndGet(metaData.getStorableSize());
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
index b50dbb1..a6bf9cf 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -81,6 +81,10 @@ public interface MessageStore
void onDelete(ConfiguredObject<?> parent);
+ void addMessageDeleteListener(MessageDeleteListener listener);
+
+ void removeMessageDeleteListener(MessageDeleteListener listener);
+
MessageStoreReader newMessageStoreReader();
interface MessageStoreReader
@@ -96,4 +100,9 @@ public interface MessageStore
void close();
}
+ interface MessageDeleteListener
+ {
+ void messageDeleted(StoredMessage<?> m);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index 37c79e5..e6f6e29 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -198,4 +198,16 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
{
}
+
+ @Override
+ public void addMessageDeleteListener(final MessageDeleteListener listener)
+ {
+
+ }
+
+ @Override
+ public void removeMessageDeleteListener(final MessageDeleteListener listener)
+ {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index bdf4f31..163d54a 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -47,11 +47,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.TestConsumerTarget;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.DirectExchangeImpl;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
@@ -64,9 +64,9 @@ import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.BrokerTestHelper;
import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.OverflowPolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
-import org.apache.qpid.server.model.OverflowPolicy;
import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -952,7 +952,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
message = createMessage(new Long(27), 20, 10);
result = queue.route(message, message.getInitialRoutingAddress(), null);
- assertTrue("Result should include not accepting route", result.hasNotAcceptingRoutableQueue());
+ assertTrue("Result should include not accepting route", result.isRejected());
int headerSize = 20;
int payloadSize = 10;
@@ -965,7 +965,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
message = createMessage(new Long(id), headerSize, payloadSize);
result = queue.route(message, message.getInitialRoutingAddress(), null);
- assertTrue("Result should include not accepting route", result.hasNotAcceptingRoutableQueue());
+ assertTrue("Result should include not accepting route", result.isRejected());
}
public void testAlternateBindingValidationRejectsNonExistingDestination()
@@ -1055,6 +1055,63 @@ abstract class AbstractQueueTestBase extends QpidTestCase
assertFalse(_queue.isDeleted());
}
+ public void testMoveMessages() throws Exception
+ {
+ doMoveOrCopyMessageTest(true);
+ }
+
+ public void testCopyMessages() throws Exception
+ {
+ doMoveOrCopyMessageTest(false);
+ }
+
+ private void doMoveOrCopyMessageTest(final boolean move)
+ {
+ Queue target = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName() + "_target"));
+
+ _queue.enqueue(createMessage(1L), null, null);
+ _queue.enqueue(createMessage(2L), null, null);
+ _queue.enqueue(createMessage(3L), null, null);
+
+ assertEquals("Unexpected number of messages on source queue", 3, _queue.getQueueDepthMessages());
+ assertEquals("Unexpected number of messages on target queue before test", 0, target.getQueueDepthMessages());
+
+ if (move)
+ {
+ _queue.moveMessages(target, null, "true = true", -1);
+ }
+ else
+ {
+ _queue.copyMessages(target, null, "true = true", -1);
+
+ }
+
+ assertEquals("Unexpected number of messages on source queue after test", move ? 0 : 3, _queue.getQueueDepthMessages());
+ assertEquals("Unexpected number of messages on target queue after test", 3, target.getQueueDepthMessages());
+ }
+
+ public void testCopyMessageRespectsQueueSizeLimits() throws Exception
+ {
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put(Queue.NAME, getTestName() + "_target");
+ attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.RING);
+ attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 2);
+
+ Queue target = _virtualHost.createChild(Queue.class, attributes);
+
+ _queue.enqueue(createMessage(1L), null, null);
+ _queue.enqueue(createMessage(2L), null, null);
+ _queue.enqueue(createMessage(3L), null, null);
+
+ assertEquals("Unexpected number of messages on source queue", 3, _queue.getQueueDepthMessages());
+ assertEquals("Unexpected number of messages on target queue before test", 0, target.getQueueDepthMessages());
+
+ _queue.copyMessages(target, null, "true = true", -1);
+
+ assertEquals("Unexpected number of messages on source queue after test", 3, _queue.getQueueDepthMessages());
+ assertEquals("Unexpected number of messages on target queue after test", 2, target.getQueueDepthMessages());
+ }
+
private long getExpirationOnQueue(final Queue<?> queue, long arrivalTime, long expiration)
{
final List<QueueEntry> entries = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/test/java/org/apache/qpid/server/queue/RejectPolicyHandlerTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/RejectPolicyHandlerTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/RejectPolicyHandlerTest.java
new file mode 100644
index 0000000..cc821ee
--- /dev/null
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/RejectPolicyHandlerTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class RejectPolicyHandlerTest extends QpidTestCase
+{
+ private RejectPolicyHandler _rejectOverflowPolicyHandler;
+ private Queue<?> _queue;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _queue = mock(Queue.class);
+ when(_queue.getName()).thenReturn("testQueue");
+ when(_queue.getMaximumQueueDepthBytes()).thenReturn(-1L);
+ when(_queue.getMaximumQueueDepthMessages()).thenReturn(-1L);
+ when(_queue.getOverflowPolicy()).thenReturn(OverflowPolicy.REJECT);
+ when(_queue.getQueueDepthMessages()).thenReturn(0);
+
+ _rejectOverflowPolicyHandler = new RejectPolicyHandler(_queue);
+ }
+
+ public void testOverfullBytes() throws Exception
+ {
+ ServerMessage incomingMessage = createIncomingMessage(6);
+ when(_queue.getQueueDepthBytes()).thenReturn(5L);
+ when(_queue.getMaximumQueueDepthBytes()).thenReturn(10L);
+ when(_queue.getQueueDepthMessages()).thenReturn(1);
+
+ try
+ {
+ _rejectOverflowPolicyHandler.checkReject(incomingMessage);
+ fail("Exception expected");
+ }
+ catch (MessageUnacceptableException e)
+ {
+ // pass
+ }
+ }
+
+ public void testOverfullMessages() throws Exception
+ {
+ ServerMessage incomingMessage = createIncomingMessage(5);
+ when(_queue.getMaximumQueueDepthMessages()).thenReturn(7L);
+ when(_queue.getQueueDepthMessages()).thenReturn(7);
+ when(_queue.getQueueDepthBytes()).thenReturn(10L);
+
+ try
+ {
+ _rejectOverflowPolicyHandler.checkReject(incomingMessage);
+ fail("Exception expected");
+ }
+ catch (MessageUnacceptableException e)
+ {
+ // pass
+ }
+ }
+
+ public void testNotOverfullMessages() throws Exception
+ {
+ when(_queue.getMaximumQueueDepthMessages()).thenReturn(1L);
+
+ ServerMessage incomingMessage1 = createIncomingMessage(2);
+ MessageInstance messageInstance1 = mock(MessageInstance.class);
+ when(messageInstance1.getMessage()).thenReturn(incomingMessage1);
+
+ ServerMessage incomingMessage2 = createIncomingMessage(2);
+
+ _rejectOverflowPolicyHandler.checkReject(incomingMessage1);
+ _rejectOverflowPolicyHandler.postEnqueue(messageInstance1);
+
+ _rejectOverflowPolicyHandler.checkReject(incomingMessage2);
+ }
+ public void testNotOverfullBytes() throws Exception
+ {
+ when(_queue.getMaximumQueueDepthBytes()).thenReturn(9L);
+ ServerMessage incomingMessage1 = createIncomingMessage(5);
+ MessageInstance messageInstance1 = mock(MessageInstance.class);
+ when(messageInstance1.getMessage()).thenReturn(incomingMessage1);
+
+ ServerMessage incomingMessage2 = createIncomingMessage(5);
+
+ _rejectOverflowPolicyHandler.checkReject(incomingMessage1);
+ _rejectOverflowPolicyHandler.postEnqueue(messageInstance1);
+
+ _rejectOverflowPolicyHandler.checkReject(incomingMessage2);
+ }
+
+ public void testIncomingMessageDeleted() throws Exception
+ {
+ when(_queue.getMaximumQueueDepthMessages()).thenReturn(1L);
+
+ ServerMessage incomingMessage1 = createIncomingMessage(2);
+
+ ServerMessage incomingMessage2 = createIncomingMessage(2);
+
+ _rejectOverflowPolicyHandler.checkReject(incomingMessage1);
+ _rejectOverflowPolicyHandler.messageDeleted(incomingMessage1.getStoredMessage());
+
+ _rejectOverflowPolicyHandler.checkReject(incomingMessage2);
+ }
+
+ private ServerMessage createIncomingMessage(final long size)
+ {
+ AMQMessageHeader incomingMessageHeader = mock(AMQMessageHeader.class);
+ ServerMessage incomingMessage = mock(ServerMessage.class);
+ when(incomingMessage.getMessageHeader()).thenReturn(incomingMessageHeader);
+ when(incomingMessage.getSizeIncludingHeader()).thenReturn(size);
+ when(incomingMessage.getStoredMessage()).thenReturn(mock(StoredMessage.class));
+ return incomingMessage;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index ee8def1..e70e9ff 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -147,7 +147,7 @@ public abstract class MessageStoreTestCase extends QpidTestCase
}
- public void enqueueMessage(final StoredMessage<TestMessageMetaData> message, final String queueName)
+ private void enqueueMessage(final StoredMessage<TestMessageMetaData> message, final String queueName)
{
Transaction txn = _store.newTransaction();
txn.enqueueMessage(new TransactionLogResource()
@@ -417,6 +417,19 @@ public abstract class MessageStoreTestCase extends QpidTestCase
assertNull(retrievedMessageRef.get());
}
+ public void testMessageDeleted() throws Exception
+ {
+ MessageStore.MessageDeleteListener listener = mock(MessageStore.MessageDeleteListener.class);
+ _store.addMessageDeleteListener(listener);
+
+ long messageId = 1;
+ int contentSize = 0;
+ final MessageHandle<TestMessageMetaData> messageHandle = _store.addMessage(new TestMessageMetaData(messageId, contentSize));
+ StoredMessage<TestMessageMetaData> message = messageHandle.allContentAdded();
+ message.remove();
+
+ verify(listener, times(1)).messageDeleted(message);
+ }
private TransactionLogResource createTransactionLogResource(UUID queueId)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 7da0d0f..989ff4e 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -956,9 +956,9 @@ public class ServerSession extends SessionInvoker
return isCommandsFull(id);
}
- public int enqueue(final MessageTransferMessage message,
- final InstanceProperties instanceProperties,
- final MessageDestination exchange)
+ RoutingResult<MessageTransferMessage> enqueue(final MessageTransferMessage message,
+ final InstanceProperties instanceProperties,
+ final MessageDestination exchange)
{
if(_outstandingCredit.get() != UNLIMITED_CREDIT
&& _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
@@ -970,10 +970,10 @@ public class ServerSession extends SessionInvoker
final long arrivalTime = message.getArrivalTime();
final RoutingResult<MessageTransferMessage> result =
exchange.route(message, message.getInitialRoutingAddress(), instanceProperties);
- int enqueues = result.send(_transaction, null);
+ result.send(_transaction, null);
getAMQPConnection().registerMessageReceived(message.getSize(), arrivalTime);
incrementOutstandingTxnsIfNecessary();
- return enqueues;
+ return result;
}
public void sendMessage(MessageTransfer xfr,
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c5e340f0/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 8c3f8c9..ae529a4 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -52,6 +52,8 @@ import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.RejectType;
+import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Exchange;
@@ -488,32 +490,50 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
}
};
- int enqueues = serverSession.enqueue(message, instanceProperties, destination);
+ RoutingResult<MessageTransferMessage> routingResult = serverSession.enqueue(message, instanceProperties, destination);
- if (enqueues == 0)
+ boolean explictlyRejected = routingResult.containsReject(RejectType.LIMIT_EXCEEDED);
+ if (!routingResult.hasRoutes() || explictlyRejected)
{
- if ((delvProps == null || !delvProps.getDiscardUnroutable())
- && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ boolean closeWhenNoRoute = serverSession.getAMQPConnection().getBroker().getConnection_closeWhenNoRoute();
+ boolean discardUnroutable = delvProps != null && delvProps.getDiscardUnroutable();
+ if (!discardUnroutable && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
{
RangeSet rejects = RangeSetFactory.createRangeSet();
rejects.add(xfr.getId());
MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
ssn.invoke(reject);
}
+ else if (!discardUnroutable && closeWhenNoRoute && explictlyRejected)
+ {
+ ExecutionErrorCode code = ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED;
+ String errorMessage = String.format("No route for message with destination '%s' and routing key '%s' : %s",
+ xfr.getDestination(),
+ message.getInitialRoutingAddress(),
+ routingResult.getRejectReason());
+
+ ExecutionException ex = new ExecutionException();
+ ex.setErrorCode(code);
+ ex.setDescription(errorMessage);
+ serverSession.invoke(ex);
+ serverSession.close(ErrorCodes.RESOURCE_ERROR, errorMessage);
+ return;
+ }
else
{
getEventLogger(ssn).message(ExchangeMessages.DISCARDMSG(destination.getName(),
- messageMetaData.getRoutingKey()));
+ messageMetaData.getRoutingKey()));
}
}
+ // TODO: we currently do not send MessageAccept when AcceptMode is EXPLICIT
if (serverSession.isTransactional())
{
serverSession.processed(xfr);
}
else
{
- serverSession.recordFuture(Futures.<Void>immediateFuture(null),
+ serverSession.recordFuture(Futures.immediateFuture(null),
new CommandProcessedAction(serverSession, xfr));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org