You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/08/07 13:15:02 UTC

svn commit: r683583 - in /incubator/qpid/trunk/qpid/java: broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/server/queue/ broker/src/main...

Author: aidan
Date: Thu Aug  7 04:15:01 2008
New Revision: 683583

URL: http://svn.apache.org/viewvc?rev=683583&view=rev
Log:
QPID-1218: Boost broker performance by lots.

AMQMessage: Allow references to be incremented in a pile

IncomingMessage: Increment message references in one go, flatten delivery loop a little.
Make _destinationQueues an ArrayList, massively increasing performance. Iter
ate through it with indexing

AccessResult: don't use StringBuilder so much

Update tests and exchanges to reflect new API usage, almost all of this is just type narrowing except for Topic where there's an extra copy, but it isn't too bad relative to the number of HashSet and HashMap operations that go on inside there.

Modified:
    incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java

Modified: incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java Thu Aug  7 04:15:01 2008
@@ -204,7 +204,7 @@
         ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
         AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue"));
 
-        Collection<AMQQueue> queues =  new ArrayList<AMQQueue>();
+        ArrayList<AMQQueue> queues =  new ArrayList<AMQQueue>();
         queues.add(q);
         payload.enqueue(queues);
         

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Thu Aug  7 04:15:01 2008
@@ -191,7 +191,7 @@
 
         final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : payload.getRoutingKey();
 
-        final List<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey);
+        final ArrayList<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey);
 
         if (_logger.isDebugEnabled())
         {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Thu Aug  7 04:15:01 2008
@@ -249,7 +249,7 @@
             _logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
         }
         boolean routed = false;
-        Collection<AMQQueue> queues = new ArrayList<AMQQueue>();
+        ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
         for (Registration e : _bindings)
         {
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java Thu Aug  7 04:15:01 2008
@@ -37,12 +37,12 @@
  */
 class Index
 {
-    private ConcurrentMap<AMQShortString, List<AMQQueue>> _index
-            = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+    private ConcurrentMap<AMQShortString, ArrayList<AMQQueue>> _index
+            = new ConcurrentHashMap<AMQShortString, ArrayList<AMQQueue>>();
 
     synchronized boolean add(AMQShortString key, AMQQueue queue)
     {
-        List<AMQQueue> queues = _index.get(key);
+        ArrayList<AMQQueue> queues = _index.get(key);
         if(queues == null)
         {
             queues = new ArrayList<AMQQueue>();
@@ -66,7 +66,7 @@
 
     synchronized boolean remove(AMQShortString key, AMQQueue queue)
     {
-        List<AMQQueue> queues = _index.get(key);
+        ArrayList<AMQQueue> queues = _index.get(key);
         if (queues != null)
         {
             queues = new ArrayList<AMQQueue>(queues);
@@ -87,7 +87,7 @@
         return false;
     }
 
-    List<AMQQueue> get(AMQShortString key)
+    ArrayList<AMQQueue> get(AMQShortString key)
     {
         return _index.get(key);
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Thu Aug  7 04:15:01 2008
@@ -32,7 +32,6 @@
 import org.apache.qpid.server.management.MBeanDescription;
 import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.exchange.topic.TopicParser;
 import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
@@ -48,9 +47,6 @@
 import javax.management.openmbean.TabularDataSupport;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.lang.ref.WeakReference;
 
 public class TopicExchange extends AbstractExchange
@@ -532,7 +528,10 @@
 
         final AMQShortString routingKey = payload.getRoutingKey();
 
-        Collection<AMQQueue> queues = getMatchedQueues(payload, routingKey);
+        // The copy here is unfortunate, but not too bad relevant to the amount of
+        // things created and copied in getMatchedQueues
+        ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+        queues.addAll(getMatchedQueues(payload, routingKey));
 
         if(queues == null || queues.isEmpty())
         {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Thu Aug  7 04:15:01 2008
@@ -291,12 +291,17 @@
         return this;
     }
 
-    /** Threadsafe. Increment the reference count on the message. */
     public boolean incrementReference()
     {
-        if(_referenceCount.incrementAndGet() <= 1)
+        return incrementReference(1);
+    }
+
+    /* Threadsafe. Increment the reference count on the message. */
+    public boolean incrementReference(int count)
+    {
+        if(_referenceCount.addAndGet(count) <= 1)
         {
-            _referenceCount.decrementAndGet();
+            _referenceCount.addAndGet(-count);
             return false;
         }
         else

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Thu Aug  7 04:15:01 2008
@@ -34,6 +34,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Collection;
 
 public class IncomingMessage implements Filterable<RuntimeException>
@@ -63,7 +64,7 @@
      * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
      * by the message handle.
      */
-    private Collection<AMQQueue> _destinationQueues;
+    private ArrayList<AMQQueue> _destinationQueues;
 
     private AMQProtocolSession _publisher;
     private MessageStore _messageStore;
@@ -134,21 +135,13 @@
 
             if(_destinationQueues != null)
             {
-                for (AMQQueue q : _destinationQueues)
+                for (int i = 0; i < _destinationQueues.size(); i++)
                 {
-                    if(q.isDurable())
-                    {
-
-                        _messageStore.enqueueMessage(_txnContext.getStoreContext(), q, _messageId);
-                    }
+                    _messageStore.enqueueMessage(_txnContext.getStoreContext(),
+                            _destinationQueues.get(i), _messageId);
                 }
             }
-
         }
-
-
-
-
     }
 
     public AMQMessage deliverToQueues()
@@ -157,10 +150,9 @@
 
         // we get a reference to the destination queues now so that we can clear the
         // transient message data as quickly as possible
-        Collection<AMQQueue> destinationQueues = _destinationQueues;
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Delivering message " + _messageId + " to " + destinationQueues);
+            _logger.debug("Delivering message " + _messageId + " to " + _destinationQueues);
         }
 
         AMQMessage message = null;
@@ -178,10 +170,7 @@
             message.setExpiration(_expiration);
             message.setClientIdentifier(_publisher.getSessionIdentifier());
 
-
-
-
-            if ((destinationQueues == null) || destinationQueues.isEmpty())
+            if ((_destinationQueues == null) || _destinationQueues.size() == 0)
             {
 
                 if (isMandatory() || isImmediate())
@@ -196,10 +185,9 @@
             }
             else
             {
-                // TODO
-
                 int offset;
-                final int queueCount = destinationQueues.size();
+                final int queueCount = _destinationQueues.size();
+                message.incrementReference(queueCount);
                 if(queueCount == 1)
                 {
                     offset = 0;
@@ -212,33 +200,16 @@
                         offset = -offset;
                     }
                 }
-
-                int i = 0;
-                for (AMQQueue q : destinationQueues)
+                for (int i = offset; i < queueCount; i++)
                 {
-                    if(++i > offset)
-                    {
-                        // Increment the references to this message for each queue delivery.
-                        message.incrementReference();
-                        // normal deliver so add this message at the end.
-                        _txnContext.deliver(q, message);
-                    }
+                    // normal deliver so add this message at the end.
+                    _txnContext.deliver(_destinationQueues.get(i), message);
                 }
-                i = 0;
-                if(offset != 0)
+                for (int i = 0; i < offset; i++)
                 {
-                    for (AMQQueue q : destinationQueues)
-                    {
-                        if(i++ < offset)
-                        {
-                            // Increment the references to this message for each queue delivery.
-                            message.incrementReference();
-                            // normal deliver so add this message at the end.
-                            _txnContext.deliver(q, message);
-                        }
-                    }
+                    // normal deliver so add this message at the end.
+                    _txnContext.deliver(_destinationQueues.get(i), message);
                 }
-
             }
 
             // we then allow the transactional context to do something with the message content
@@ -329,7 +300,7 @@
         _exchange.route(this);
     }
 
-    public void enqueue(final Collection<AMQQueue> queues)
+    public void enqueue(final ArrayList<AMQQueue> queues)
     {
         _destinationQueues = queues;
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java Thu Aug  7 04:15:01 2008
@@ -27,23 +27,23 @@
         GRANTED, REFUSED
     }
 
-    StringBuilder _authorizer;
-    AccessStatus _status;
+    private String _authorizer;
+    private AccessStatus _status;
 
     public AccessResult(ACLPlugin authorizer, AccessStatus status)
     {
         _status = status;
-        _authorizer = new StringBuilder(authorizer.getPluginName());
+        _authorizer = authorizer.getPluginName();
     }
 
     public void setAuthorizer(ACLPlugin authorizer)
     {
-        _authorizer.append(authorizer.getPluginName());
+        _authorizer += authorizer.getPluginName();
     }
 
     public String getAuthorizer()
     {
-        return _authorizer.toString();
+        return _authorizer;
     }
 
     public void setStatus(AccessStatus status)
@@ -58,8 +58,7 @@
 
     public void addAuthorizer(ACLPlugin accessManager)
     {
-        _authorizer.insert(0, "->");
-        _authorizer.insert(0, accessManager.getPluginName());
+        _authorizer = accessManager.getPluginName() + "->" + _authorizer;
     }
 
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Thu Aug  7 04:15:01 2008
@@ -43,6 +43,8 @@
 import org.apache.mina.common.ByteBuffer;
 
 import javax.management.Notification;
+
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.Collections;
 
@@ -304,7 +306,9 @@
         for (int i = 0; i < messages.length; i++)
         {
             messages[i] = message(false, size);
-            messages[i].enqueue(Collections.singleton(_queue));
+            ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+            qs.add(_queue);
+            messages[i].enqueue(qs);
             messages[i].routingComplete(_messageStore, new MessageHandleFactory());
 
         }

Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Thu Aug  7 04:15:01 2008
@@ -47,6 +47,8 @@
 import org.apache.mina.common.ByteBuffer;
 
 import javax.management.JMException;
+
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.Collections;
 
@@ -216,8 +218,9 @@
         IncomingMessage msg = message(false, false);
         long id = msg.getMessageId();
         _queue.clearQueue(_storeContext);
-
-        msg.enqueue(Collections.singleton(_queue));
+        ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+        qs.add(_queue);
+        msg.enqueue(qs);
         msg.routingComplete(_messageStore, new MessageHandleFactory());
 
         msg.addContentBodyFrame(new ContentChunk()
@@ -319,7 +322,9 @@
         for (int i = 0; i < messageCount; i++)
         {
             IncomingMessage currentMessage = message(false, persistent);
-            currentMessage.enqueue(Collections.singleton(_queue));
+            ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+            qs.add(_queue);
+            currentMessage.enqueue(qs);
 
             // route header
             currentMessage.routingComplete(_messageStore, new MessageHandleFactory());

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Thu Aug  7 04:15:01 2008
@@ -40,6 +40,7 @@
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.util.NullApplicationRegistry;
 
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.Set;
 import java.util.Collections;
@@ -146,7 +147,9 @@
             // we increment the reference here since we are not delivering the messaging to any queues, which is where
             // the reference is normally incremented. The test is easier to construct if we have direct access to the
             // subscription
-            msg.enqueue(Collections.singleton(_queue));
+            ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+            qs.add(_queue);
+            msg.enqueue(qs);
             msg.routingComplete(_messageStore, factory);
             if(msg.allContentReceived())
             {