You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/06/17 09:06:54 UTC
svn commit: r1748818 - in
/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server: model/
queue/
Author: rgodfrey
Date: Fri Jun 17 09:06:54 2016
New Revision: 1748818
URL: http://svn.apache.org/viewvc?rev=1748818&view=rev
Log:
QPID-7305 : Add optional limit argument to queue operations copy/move/delete
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1748818&r1=1748817&r2=1748818&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Fri Jun 17 09:06:54 2016
@@ -274,18 +274,21 @@ public interface Queue<X extends Queue<X
@ManagedOperation
List<Long> moveMessages(@Param(name = "destination", description = "The queue to which the messages should be moved") Queue<?> destination,
@Param(name = "messageIds", description = "If provided, only messages in the queue whose (internal) message-id is supplied will be considered for moving") List<Long> messageIds,
- @Param(name = "selector", description = "A (JMS) selector - if provided, only messages which match the selector will be considered for moving") String selector);
+ @Param(name = "selector", description = "A (JMS) selector - if provided, only messages which match the selector will be considered for moving") String selector,
+ @Param(name = "limit", description = "Maximum number of messages to move", defaultValue = "-1") int limit);
@ManagedOperation
List<Long> copyMessages(@Param(name = "destination", description = "The queue to which the messages should be copied") Queue<?> destination,
@Param(name = "messageIds", description = "If provided, only messages in the queue whose (internal) message-id is supplied will be considered for copying") List<Long> messageIds,
- @Param(name = "selector", description = "A (JMS) selector - if provided, only messages which match the selector will be considered for copying") String selector);
+ @Param(name = "selector", description = "A (JMS) selector - if provided, only messages which match the selector will be considered for copying") String selector,
+ @Param(name = "limit", description = "Maximum number of messages to copy", defaultValue = "-1") int limit);
@ManagedOperation
List<Long> deleteMessages(@Param(name = "messageIds", description = "If provided, only messages in the queue whose (internal) message-id is supplied will be considered for deletion") List<Long> messageIds,
- @Param(name = "selector", description = "A (JMS) selector - if provided, only messages which match the selector will be considered for deletion") String selector);
+ @Param(name = "selector", description = "A (JMS) selector - if provided, only messages which match the selector will be considered for deletion") String selector,
+ @Param(name = "limit", description = "Maximum number of messages to delete", defaultValue = "-1") int limit);
@ManagedOperation
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1748818&r1=1748817&r2=1748818&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Fri Jun 17 09:06:54 2016
@@ -1831,7 +1831,7 @@ public abstract class AbstractQueue<X ex
}
/**
- * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
+ * Returns a list of QueueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
*
* The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1.
* Using 0 in the 'to' field will return an empty list regardless of the 'from' value.
@@ -3416,13 +3416,14 @@ public abstract class AbstractQueue<X ex
}
@Override
- public List<Long> moveMessages(Queue<?> destination, List<Long> messageIds, final String selector)
+ public List<Long> moveMessages(Queue<?> destination, List<Long> messageIds, final String selector, final int limit)
{
// FIXME: added temporary authorization check until we introduce management layer
// and review current ACL rules to have common rules for all management interfaces
authorizeMethod("moveMessages");
- MoveMessagesTransaction transaction = new MoveMessagesTransaction(this, messageIds, destination, parseSelector(selector));
+ MoveMessagesTransaction transaction = new MoveMessagesTransaction(this, messageIds, destination, parseSelector(selector),
+ limit);
_virtualHost.executeTransaction(transaction);
return transaction.getModifiedMessageIds();
@@ -3441,27 +3442,29 @@ public abstract class AbstractQueue<X ex
}
@Override
- public List<Long> copyMessages(Queue<?> destination, List<Long> messageIds, final String selector)
+ public List<Long> copyMessages(Queue<?> destination, List<Long> messageIds, final String selector, int limit)
{
// FIXME: added temporary authorization check until we introduce management layer
// and review current ACL rules to have common rules for all management interfaces
authorizeMethod("copyMessages");
- CopyMessagesTransaction transaction = new CopyMessagesTransaction(this, messageIds, destination, parseSelector(selector));
+ CopyMessagesTransaction transaction = new CopyMessagesTransaction(this, messageIds, destination, parseSelector(selector),
+ limit);
_virtualHost.executeTransaction(transaction);
return transaction.getModifiedMessageIds();
}
@Override
- public List<Long> deleteMessages(final List<Long> messageIds, final String selector)
+ public List<Long> deleteMessages(final List<Long> messageIds, final String selector, int limit)
{
// FIXME: added temporary authorization check until we introduce management layer
// and review current ACL rules to have common rules for all management interfaces
authorizeMethod("deleteMessages");
- DeleteMessagesTransaction transaction = new DeleteMessagesTransaction(this, messageIds, parseSelector(selector));
+ DeleteMessagesTransaction transaction = new DeleteMessagesTransaction(this, messageIds, parseSelector(selector),
+ limit);
_virtualHost.executeTransaction(transaction);
return transaction.getModifiedMessageIds();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java?rev=1748818&r1=1748817&r2=1748818&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java Fri Jun 17 09:06:54 2016
@@ -35,9 +35,9 @@ public class CopyMessagesTransaction ext
public CopyMessagesTransaction(Queue sourceQueue,
List<Long> messageIds,
Queue destinationQueue,
- final MessageFilter filter)
+ final MessageFilter filter, final int limit)
{
- super(sourceQueue, messageIds, filter);
+ super(sourceQueue, messageIds, filter, limit);
_destinationQueue = destinationQueue;
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java?rev=1748818&r1=1748817&r2=1748818&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java Fri Jun 17 09:06:54 2016
@@ -28,9 +28,12 @@ import org.apache.qpid.server.model.Virt
public class DeleteMessagesTransaction extends QueueEntryTransaction
{
- public DeleteMessagesTransaction(Queue sourceQueue, List<Long> messageIds, final MessageFilter filter)
+ public DeleteMessagesTransaction(Queue sourceQueue,
+ List<Long> messageIds,
+ final MessageFilter filter,
+ final int limit)
{
- super(sourceQueue, messageIds, filter);
+ super(sourceQueue, messageIds, filter, limit);
}
@Override
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java?rev=1748818&r1=1748817&r2=1748818&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java Fri Jun 17 09:06:54 2016
@@ -35,9 +35,9 @@ public class MoveMessagesTransaction ext
public MoveMessagesTransaction(Queue sourceQueue,
List<Long> messageIds,
Queue destinationQueue,
- final MessageFilter filter)
+ final MessageFilter filter, final int limit)
{
- super(sourceQueue, messageIds, filter);
+ super(sourceQueue, messageIds, filter, limit);
_destinationQueue = destinationQueue;
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java?rev=1748818&r1=1748817&r2=1748818&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java Fri Jun 17 09:06:54 2016
@@ -34,25 +34,26 @@ abstract class QueueEntryTransaction imp
private final List<Long> _messageIds;
private final MessageFilter _filter;
private final List<Long> _modifiedMessageIds = new ArrayList<>();
+ private int _limit;
- QueueEntryTransaction(Queue sourceQueue, List<Long> messageIds, final MessageFilter filter)
+ QueueEntryTransaction(Queue sourceQueue, List<Long> messageIds, final MessageFilter filter, final int limit)
{
_sourceQueue = sourceQueue;
_messageIds = messageIds == null ? null : new ArrayList<>(messageIds);
_filter = filter;
+ _limit = limit <= 0 ? -1 : _limit;
}
@Override
public final void withinTransaction(final VirtualHost.Transaction txn)
{
-
_sourceQueue.visit(new QueueEntryVisitor()
{
public boolean visit(final QueueEntry entry)
{
final ServerMessage message = entry.getMessage();
- if(message != null)
+ if (message != null)
{
final long messageId = message.getMessageNumber();
if ((_messageIds == null || _messageIds.remove(messageId))
@@ -60,11 +61,16 @@ abstract class QueueEntryTransaction imp
{
updateEntry(entry, txn);
_modifiedMessageIds.add(messageId);
+ if (_limit > 0)
+ {
+ _limit--;
+ }
}
}
- return _messageIds != null && _messageIds.isEmpty();
+ return _limit == 0 || (_messageIds != null && _messageIds.isEmpty());
}
});
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org