You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2006/10/24 13:50:10 UTC

svn commit: r467313 - in /incubator/qpid/trunk/qpid/java: broker/etc/ broker/src/org/apache/qpid/server/queue/ broker/src/org/apache/qpid/server/util/ common/src/org/apache/qpid/framing/

Author: ritchiem
Date: Tue Oct 24 04:50:09 2006
New Revision: 467313

URL: http://svn.apache.org/viewvc?view=rev&rev=467313
Log:
QPID-48
Enabled buffer compression on delivery queue as a configuration option from the xml.(advanced.compressBufferOnQueue)
Changed DeliveryManager.java to use ConcurrentLinkedQueueNoSize.java this is the standard  ConcurrentLinkedQueue but where the size() method returns 0(empty) or 1(has content) as it is an expensive operation.
Previously there was a race condition with the use of the _queueing boolean. Under load the consumer would sometimes stop getting messages. This was due to the messages being enqueued without starting the Worker thread. 

There is still an issue (QPID-54) on high receiving load messages rate received by a client drops.

Added:
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java   (with props)
Modified:
    incubator/qpid/trunk/qpid/java/broker/etc/config.xml
    incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
    incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/ContentBody.java

Modified: incubator/qpid/trunk/qpid/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/config.xml?view=diff&rev=467313&r1=467312&r2=467313
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/config.xml (original)
+++ incubator/qpid/trunk/qpid/java/broker/etc/config.xml Tue Oct 24 04:50:09 2006
@@ -37,6 +37,7 @@
         <enablePooledAllocator>false</enablePooledAllocator>
         <enableDirectBuffers>false</enableDirectBuffers>
         <framesize>65535</framesize>
+        <compressBufferOnQueue>false</compressBufferOnQueue>
     </advanced>
     <security>
         <principal-databases>

Modified: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=467313&r1=467312&r2=467313
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/DeliveryManager.java Tue Oct 24 04:50:09 2006
@@ -17,28 +17,35 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.AMQException;
 import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.Configured;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.configuration.Configurator;
+import org.apache.qpid.server.util.ConcurrentLinkedQueueNoSize;
 
-import java.util.LinkedList;
-import java.util.Queue;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Manages delivery of messages on behalf of a queue
- *
  */
-class DeliveryManager
+public class DeliveryManager
 {
     private static final Logger _log = Logger.getLogger(DeliveryManager.class);
 
+    @Configured(path = "advanced.compressBufferOnQueue",
+                defaultValue = "false")
+    public boolean compressBufferOnQueue;
     /**
      * Holds any queued messages
      */
-    private final Queue<AMQMessage> _messages = new LinkedList<AMQMessage>();
+    private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueNoSize<AMQMessage>();
+    //private int _messageCount;
     /**
      * Ensures that only one asynchronous task is running for this manager at
      * any time.
@@ -50,72 +57,124 @@
     private final SubscriptionManager _subscriptions;
 
     /**
-     * An indication of the mode we are in. If this is true then messages are
-     * being queued up in _messages for asynchronous delivery. If it is false
-     * then messages can be delivered directly as they come in.
-     */
-    private boolean _queueing;
-
-    /**
      * A reference to the queue we are delivering messages for. We need this to be able
      * to pass the code that handles acknowledgements a handle on the queue.
      */
     private final AMQQueue _queue;
 
+
+    private volatile int _queueSize = 0;
+
     DeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
     {
+        //Set values from configuration
+        Configurator.configure(this);
+
+        if (compressBufferOnQueue)
+        {
+            _log.info("Compressing Buffers on queue.");
+        }
+
         _subscriptions = subscriptions;
         _queue = queue;
     }
 
-    private synchronized boolean enqueue(AMQMessage msg)
+    /**
+     * @return boolean if we are queueing
+     */
+    private boolean queueing()
     {
-        if (_queueing)
+        return getMessageCount() != 0;
+    }
+
+
+    /**
+     * @param msg to enqueue
+     * @return true if we are queue this message
+     */
+    private boolean enqueue(AMQMessage msg)
+    {
+        if (msg.isImmediate())
+        {
+            return false;
+        }
+        else
         {
-            if(msg.isImmediate())
+            if (queueing())
             {
-                //can't enqueue messages for whom immediate delivery is required
-                return false;
+                return addMessageToQueue(msg);
             }
             else
             {
-                _messages.offer(msg);
-                return true;
+                return false;
             }
         }
-        else
+    }
+
+    private void startQueueing(AMQMessage msg)
+    {
+        if (!msg.isImmediate())
         {
-            return false;
+            addMessageToQueue(msg);
         }
     }
 
-    private synchronized void startQueueing(AMQMessage msg)
+    private boolean addMessageToQueue(AMQMessage msg)
     {
-        _queueing = true;
-        enqueue(msg);
+        // Shrink the ContentBodies to their actual size to save memory.
+        // synchronize to ensure this msg is the next one to get added.
+        if (compressBufferOnQueue)
+        {
+            synchronized(_messages)
+            {
+                Iterator it = msg.getContentBodies().iterator();
+                while (it.hasNext())
+                {
+                    ContentBody cb = (ContentBody) it.next();
+                    cb.reduceBufferToFit();
+                }
+
+                _messages.offer(msg);
+                _queueSize++;
+            }
+        }
+        else
+        {
+            _messages.offer(msg);
+            _queueSize++;
+        }
+        return true;
     }
 
+
     /**
      * Determines whether there are queued messages. Sets _queueing to false if
      * there are no queued messages. This needs to be atomic.
      *
      * @return true if there are queued messages
      */
-    private synchronized boolean hasQueuedMessages()
+    private boolean hasQueuedMessages()
     {
-        boolean empty = _messages.isEmpty();
-        if (empty)
-        {
-            _queueing = false;
-        }
-        return !empty;
+        return getMessageCount() != 0;
     }
 
-    public synchronized int getQueueMessageCount()
+    public int getQueueMessageCount()
+    {
+        return getMessageCount();
+    }
+
+    /**
+     * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size.
+     *
+     * @return int the number of messages in the delivery queue.
+     */
+
+    private int getMessageCount()
     {
         return _messages.size();
     }
 
+
     protected synchronized List<AMQMessage> getMessages()
     {
         return new ArrayList<AMQMessage>(_messages);
@@ -151,7 +210,9 @@
             boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
             while (hasQueuedMessages() && hasSubscribers)
             {
-                Subscription next =  _subscriptions.nextSubscriber(peek());
+                // _log.debug("Have messages(" + _messages.size() + ") and subscribers");
+                Subscription next = _subscriptions.nextSubscriber(peek());
+
                 //We don't synchronize access to subscribers so need to re-check
                 if (next != null)
                 {
@@ -169,20 +230,24 @@
         }
         finally
         {
+            _log.debug("End of processQueue: (" + _queueSize + ")" + " subscribers:" + _subscriptions.hasActiveSubscribers());
             _processing.set(false);
         }
     }
 
-    private synchronized AMQMessage peek()
+    private AMQMessage peek()
     {
         return _messages.peek();
     }
 
-    private synchronized AMQMessage poll()
+    private AMQMessage poll()
     {
+        _queueSize--;
         return _messages.poll();
     }
 
+    Runner asyncDelivery = new Runner();
+
     /**
      * Requests that the delivery manager start processing the queue asynchronously
      * if there is work that can be done (i.e. there are messages queued up and
@@ -197,12 +262,16 @@
      */
     void processAsync(Executor executor)
     {
+        _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + _queueSize + ")" +
+                   " Active:" + _subscriptions.hasActiveSubscribers() +
+                   " Processing:" + _processing.get());
+
         if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
         {
             //are we already running? if so, don't re-run
             if (_processing.compareAndSet(false, true))
             {
-                executor.execute(new Runner());
+                executor.execute(asyncDelivery);
             }
         }
     }
@@ -214,8 +283,6 @@
      *
      * @param name the name of the entity on whose behalf we are delivering the message
      * @param msg  the message to deliver
-     * @throws NoConsumersException if there are no active subscribers to deliver
-     *                              the message to
      * @throws FailedDequeueException if the message could not be dequeued
      */
     void deliver(String name, AMQMessage msg) throws FailedDequeueException
@@ -224,11 +291,27 @@
         if (!enqueue(msg))
         {
             // not queueing so deliver message to 'next' subscriber
-            Subscription s =  _subscriptions.nextSubscriber(msg);
+            Subscription s = _subscriptions.nextSubscriber(msg);
             if (s == null)
             {
                 if (!msg.isImmediate())
                 {
+                    if (_subscriptions instanceof SubscriptionSet)
+                    {
+                        if (_log.isDebugEnabled())
+                        {
+                            _log.debug("Start Queueing messages Active Subs:" + _subscriptions.hasActiveSubscribers()
+                                       + " Size :" + ((SubscriptionSet) _subscriptions).size()
+                                       + " Empty :" + ((SubscriptionSet) _subscriptions).isEmpty());
+                        }
+                    }
+                    else
+                    {
+                        if (_log.isDebugEnabled())
+                        {
+                            _log.debug("Start Queueing messages Active Subs:" + _subscriptions.hasActiveSubscribers());
+                        }
+                    }
                     // no subscribers yet so enter 'queueing' mode and queue this message
                     startQueueing(msg);
                 }

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java?view=auto&rev=467313
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java Tue Oct 24 04:50:09 2006
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class ConcurrentLinkedQueueNoSize<E> extends ConcurrentLinkedQueue<E>
+{
+    public int size()
+    {
+        if (isEmpty())
+        {
+            return 0;
+        }
+        else
+        {
+            return 1;
+        }
+    }        
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/ContentBody.java?view=diff&rev=467313&r1=467312&r2=467313
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/ContentBody.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/ContentBody.java Tue Oct 24 04:50:09 2006
@@ -63,8 +63,7 @@
             ByteBuffer newPayload = ByteBuffer.allocate(size);
 
             newPayload.put(payload);
-            newPayload.position(0);
-            newPayload.limit(size);
+            newPayload.flip();
 
             //reduce reference count on payload
             payload.release();