You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/06/17 09:06:54 UTC

svn commit: r1748818 - in /qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server: model/ queue/

Author: rgodfrey
Date: Fri Jun 17 09:06:54 2016
New Revision: 1748818

URL: http://svn.apache.org/viewvc?rev=1748818&view=rev
Log:
QPID-7305 : Add optional limit argument to queue operations copy/move/delete

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1748818&r1=1748817&r2=1748818&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Fri Jun 17 09:06:54 2016
@@ -274,18 +274,21 @@ public interface Queue<X extends Queue<X
     @ManagedOperation
     List<Long> moveMessages(@Param(name = "destination", description = "The queue to which the messages should be moved") Queue<?> destination,
                             @Param(name = "messageIds", description = "If provided, only messages in the queue whose (internal) message-id is supplied will be considered for moving") List<Long> messageIds,
-                            @Param(name = "selector", description = "A (JMS) selector - if provided, only messages which match the selector will be considered for moving") String selector);
+                            @Param(name = "selector", description = "A (JMS) selector - if provided, only messages which match the selector will be considered for moving") String selector,
+                            @Param(name = "limit", description = "Maximum number of messages to move", defaultValue = "-1") int limit);
 
 
     @ManagedOperation
     List<Long> copyMessages(@Param(name = "destination", description = "The queue to which the messages should be copied") Queue<?> destination,
                             @Param(name = "messageIds", description = "If provided, only messages in the queue whose (internal) message-id is supplied will be considered for copying") List<Long> messageIds,
-                            @Param(name = "selector", description = "A (JMS) selector - if provided, only messages which match the selector will be considered for copying")  String selector);
+                            @Param(name = "selector", description = "A (JMS) selector - if provided, only messages which match the selector will be considered for copying")  String selector,
+                            @Param(name = "limit", description = "Maximum number of messages to copy", defaultValue = "-1") int limit);
 
 
     @ManagedOperation
     List<Long> deleteMessages(@Param(name = "messageIds", description = "If provided, only messages in the queue whose (internal) message-id is supplied will be considered for deletion") List<Long> messageIds,
-                              @Param(name = "selector", description = "A (JMS) selector - if provided, only messages which match the selector will be considered for deletion") String selector);
+                              @Param(name = "selector", description = "A (JMS) selector - if provided, only messages which match the selector will be considered for deletion") String selector,
+                              @Param(name = "limit", description = "Maximum number of messages to delete", defaultValue = "-1") int limit);
 
 
     @ManagedOperation

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1748818&r1=1748817&r2=1748818&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Fri Jun 17 09:06:54 2016
@@ -1831,7 +1831,7 @@ public abstract class AbstractQueue<X ex
     }
 
     /**
-     * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
+     * Returns a list of QueueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
      *
      * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1.
      * Using 0 in the 'to' field will return an empty list regardless of the 'from' value.
@@ -3416,13 +3416,14 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
-    public List<Long> moveMessages(Queue<?> destination, List<Long> messageIds, final String selector)
+    public List<Long> moveMessages(Queue<?> destination, List<Long> messageIds, final String selector, final int limit)
     {
         // FIXME: added temporary authorization check until we introduce management layer
         // and review current ACL rules to have common rules for all management interfaces
         authorizeMethod("moveMessages");
 
-        MoveMessagesTransaction transaction = new MoveMessagesTransaction(this, messageIds, destination, parseSelector(selector));
+        MoveMessagesTransaction transaction = new MoveMessagesTransaction(this, messageIds, destination, parseSelector(selector),
+                                                                          limit);
         _virtualHost.executeTransaction(transaction);
         return transaction.getModifiedMessageIds();
 
@@ -3441,27 +3442,29 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
-    public List<Long> copyMessages(Queue<?> destination, List<Long> messageIds, final String selector)
+    public List<Long> copyMessages(Queue<?> destination, List<Long> messageIds, final String selector, int limit)
     {
         // FIXME: added temporary authorization check until we introduce management layer
         // and review current ACL rules to have common rules for all management interfaces
         authorizeMethod("copyMessages");
 
-        CopyMessagesTransaction transaction = new CopyMessagesTransaction(this, messageIds, destination, parseSelector(selector));
+        CopyMessagesTransaction transaction = new CopyMessagesTransaction(this, messageIds, destination, parseSelector(selector),
+                                                                          limit);
         _virtualHost.executeTransaction(transaction);
         return transaction.getModifiedMessageIds();
 
     }
 
     @Override
-    public List<Long> deleteMessages(final List<Long> messageIds, final String selector)
+    public List<Long> deleteMessages(final List<Long> messageIds, final String selector, int limit)
     {
 
         // FIXME: added temporary authorization check until we introduce management layer
         // and review current ACL rules to have common rules for all management interfaces
         authorizeMethod("deleteMessages");
 
-        DeleteMessagesTransaction transaction = new DeleteMessagesTransaction(this, messageIds, parseSelector(selector));
+        DeleteMessagesTransaction transaction = new DeleteMessagesTransaction(this, messageIds, parseSelector(selector),
+                                                                              limit);
         _virtualHost.executeTransaction(transaction);
 
         return transaction.getModifiedMessageIds();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java?rev=1748818&r1=1748817&r2=1748818&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java Fri Jun 17 09:06:54 2016
@@ -35,9 +35,9 @@ public class CopyMessagesTransaction ext
     public CopyMessagesTransaction(Queue sourceQueue,
                                    List<Long> messageIds,
                                    Queue destinationQueue,
-                                   final MessageFilter filter)
+                                   final MessageFilter filter, final int limit)
     {
-        super(sourceQueue, messageIds, filter);
+        super(sourceQueue, messageIds, filter, limit);
         _destinationQueue = destinationQueue;
     }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java?rev=1748818&r1=1748817&r2=1748818&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java Fri Jun 17 09:06:54 2016
@@ -28,9 +28,12 @@ import org.apache.qpid.server.model.Virt
 
 public class DeleteMessagesTransaction extends QueueEntryTransaction
 {
-    public DeleteMessagesTransaction(Queue sourceQueue, List<Long> messageIds, final MessageFilter filter)
+    public DeleteMessagesTransaction(Queue sourceQueue,
+                                     List<Long> messageIds,
+                                     final MessageFilter filter,
+                                     final int limit)
     {
-        super(sourceQueue, messageIds, filter);
+        super(sourceQueue, messageIds, filter, limit);
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java?rev=1748818&r1=1748817&r2=1748818&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java Fri Jun 17 09:06:54 2016
@@ -35,9 +35,9 @@ public class MoveMessagesTransaction ext
     public MoveMessagesTransaction(Queue sourceQueue,
                                    List<Long> messageIds,
                                    Queue destinationQueue,
-                                   final MessageFilter filter)
+                                   final MessageFilter filter, final int limit)
     {
-        super(sourceQueue, messageIds, filter);
+        super(sourceQueue, messageIds, filter, limit);
         _destinationQueue = destinationQueue;
     }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java?rev=1748818&r1=1748817&r2=1748818&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java Fri Jun 17 09:06:54 2016
@@ -34,25 +34,26 @@ abstract class QueueEntryTransaction imp
     private final List<Long> _messageIds;
     private final MessageFilter _filter;
     private final List<Long> _modifiedMessageIds = new ArrayList<>();
+    private int _limit;
 
-    QueueEntryTransaction(Queue sourceQueue, List<Long> messageIds, final MessageFilter filter)
+    QueueEntryTransaction(Queue sourceQueue, List<Long> messageIds, final MessageFilter filter, final int limit)
     {
         _sourceQueue = sourceQueue;
         _messageIds = messageIds == null ? null : new ArrayList<>(messageIds);
         _filter = filter;
+        _limit = limit <= 0 ? -1 : _limit;
     }
 
     @Override
     public final void withinTransaction(final VirtualHost.Transaction txn)
     {
-
         _sourceQueue.visit(new QueueEntryVisitor()
         {
 
             public boolean visit(final QueueEntry entry)
             {
                 final ServerMessage message = entry.getMessage();
-                if(message != null)
+                if (message != null)
                 {
                     final long messageId = message.getMessageNumber();
                     if ((_messageIds == null || _messageIds.remove(messageId))
@@ -60,11 +61,16 @@ abstract class QueueEntryTransaction imp
                     {
                         updateEntry(entry, txn);
                         _modifiedMessageIds.add(messageId);
+                        if (_limit > 0)
+                        {
+                            _limit--;
+                        }
                     }
                 }
-                return _messageIds != null && _messageIds.isEmpty();
+                return _limit == 0 || (_messageIds != null && _messageIds.isEmpty());
             }
         });
+
     }
 
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org