You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by bh...@apache.org on 2007/03/15 16:44:02 UTC

svn commit: r518669 - in /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue: AMQQueue.java ConcurrentSelectorDeliveryManager.java DeliveryManager.java

Author: bhupendrab
Date: Thu Mar 15 08:44:01 2007
New Revision: 518669

URL: http://svn.apache.org/viewvc?view=rev&rev=518669
Log:
- DeliveryManager.getMessage is reimplmented because the ConcurrentLinkedMessageQueueAtomicSize.toArray is not implemented
- Not creating lock while doing startMovingMessages.Just setting movingMessage to true, because that can stop the sync delivery.
- And some tidy up of the code.

Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=518669&r1=518668&r2=518669
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Mar 15 08:44:01 2007
@@ -235,37 +235,40 @@
         return _deliveryMgr.getMessages();
     }
 
+    /**
+     * Returns messages within the given range of message Ids
+     * @param fromMessageId
+     * @param toMessageId
+     * @return List of messages
+     */
+    public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
+    {
+        return _deliveryMgr.getMessages(fromMessageId, toMessageId);
+    }
+
     public long getQueueDepth()
     {
         return _deliveryMgr.getTotalMessageSize();
     }
 
-
     /**
      * @param messageId
-     *
      * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
      */
     public AMQMessage getMessageOnTheQueue(long messageId)
     {
-        List<AMQMessage> list = getMessagesOnTheQueue();
-        AMQMessage msg = null;
-        for (AMQMessage message : list)
+        List<AMQMessage> list = getMessagesOnTheQueue(messageId, messageId);
+        if (list == null || list.size() == 0)
         {
-            if (message.getMessageId() == messageId)
-            {
-                msg = message;
-                break;
-            }
+            return null;
         }
-
-        return msg;
+        return list.get(0);
     }
 
     /**
      * moves messages from this queue to another queue. to do this the approach is following- - setup the queue for
-     * moving messages (hold the lock and stop the async delivery) - get all the messages available in the given message
-     * id range - setup the other queue for moving messages (hold the lock and stop the async delivery) - send these
+     * moving messages (stop the async delivery) - get all the messages available in the given message
+     * id range - setup the other queue for moving messages (stop the async delivery) - send these
      * available messages to the other queue (enqueue in other queue) - Once sending to other Queue is successful,
      * remove messages from this queue - remove locks from both queues and start async delivery
      *
@@ -282,24 +285,7 @@
         try
         {
             startMovingMessages();
-            List<AMQMessage> list = getMessagesOnTheQueue();
-            List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
-            int maxMessageCountToBeMoved = (int) (toMessageId - fromMessageId + 1);
-
-            // Run this loop till you find all the messages or the list has no more messages
-            for (AMQMessage message : list)
-            {
-                long msgId = message.getMessageId();
-                if (msgId >= fromMessageId && msgId <= toMessageId)
-                {
-                    foundMessagesList.add(message);
-                }
-                // break the loop as soon as messages to be removed are found
-                if (foundMessagesList.size() == maxMessageCountToBeMoved)
-                {
-                    break;
-                }
-            }
+            List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
 
             // move messages to another queue
             anotherQueue.startMovingMessages();

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=518669&r1=518668&r2=518669
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Thu Mar 15 08:44:01 2007
@@ -208,15 +208,61 @@
         }
     }
 
-
+    /**
+     * Returns all the messages in the Queue
+     * @return List of messages
+     */
     public List<AMQMessage> getMessages()
     {
         _lock.lock();
-        ArrayList<AMQMessage> list = new ArrayList<AMQMessage>(_messages);
+        List<AMQMessage> list = new ArrayList<AMQMessage>();
+
+        for (AMQMessage message : _messages)
+        {
+            list.add(message);
+        }
         _lock.unlock();
+        
         return list;
     }
 
+    /**
+     * Returns messages within the range of given messageIds
+     * @param fromMessageId
+     * @param toMessageId
+     * @return
+     */
+    public List<AMQMessage> getMessages(long fromMessageId, long toMessageId)
+    {
+        if (fromMessageId <= 0 || toMessageId <= 0)
+        {
+            return null;
+        }
+
+        long maxMessageCount = toMessageId - fromMessageId + 1;
+
+        _lock.lock();
+        
+        List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
+
+        for (AMQMessage message : _messages)
+        {
+            long msgId = message.getMessageId();
+            if (msgId >= fromMessageId && msgId <= toMessageId)
+            {
+                foundMessagesList.add(message);
+            }
+            // break if the no of messages are found
+            if (foundMessagesList.size() == maxMessageCount)
+            {
+                break;
+            }
+        }
+        _lock.unlock();
+
+        return foundMessagesList;
+    }
+
     public void populatePreDeliveryQueue(Subscription subscription)
     {
         if (_log.isTraceEnabled())
@@ -294,7 +340,6 @@
      */
     public void startMovingMessages()
     {
-        _lock.lock();
         _movingMessages.set(true);
     }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=518669&r1=518668&r2=518669
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Thu Mar 15 08:44:01 2007
@@ -86,6 +86,8 @@
 
     List<AMQMessage> getMessages();
 
+    List<AMQMessage> getMessages(long fromMessageId, long toMessageId);
+
     void populatePreDeliveryQueue(Subscription subscription);
 
     boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException;