You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/07/17 13:38:13 UTC

svn commit: r556890 - in /incubator/qpid/branches/M2/java: broker/src/main/java/org/apache/qpid/server/handler/ common/src/main/java/org/apache/qpid/framing/

Author: ritchiem
Date: Tue Jul 17 04:38:10 2007
New Revision: 556890

URL: http://svn.apache.org/viewvc?view=rev&rev=556890
Log:
QPID-541 A large portion of memory was being wasted in 32k ByteBuffers being held by the AMQShortStrings. 

Patch submitted by Robert Godfrey to intern() the AMQSSs to reduce memory usage. Current implementation *will* impact performance due to the usage of a static Map for storage. However, a thread local implementation is in the works.

Modified:
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=556890&r1=556889&r2=556890
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Tue Jul 17 04:38:10 2007
@@ -98,6 +98,12 @@
             }
             else
             {
+
+                if (body.consumerTag != null)
+                {
+                    body.consumerTag = body.consumerTag.intern();
+                }
+
                 try
                 {
                     AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
@@ -136,15 +142,15 @@
                     // If the above doesn't work then perhaps this is wrong too.
 //                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
 //                                                      "Non-unique consumer tag, '" + body.consumerTag + "'");
-                                        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
                     // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
                     // Be aware of possible changes to parameter order as versions change.
                     session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
-                        (byte)8, (byte)0,	// AMQP version (major, minor)
-                        BasicConsumeBody.getClazz((byte)8, (byte)0),	// classId
-                        BasicConsumeBody.getMethod((byte)8, (byte)0),	// methodId
-                        AMQConstant.NOT_ALLOWED.getCode(),	// replyCode
-                        msg));	// replyText
+                                                                          (byte) 8, (byte) 0,    // AMQP version (major, minor)
+                                                                          BasicConsumeBody.getClazz((byte) 8, (byte) 0),    // classId
+                                                                          BasicConsumeBody.getMethod((byte) 8, (byte) 0),    // methodId
+                                                                          AMQConstant.NOT_ALLOWED.getCode(),    // replyCode
+                                                                          msg));    // replyText
                 }
                 catch (AMQQueue.ExistingExclusiveSubscription e)
                 {

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?view=diff&rev=556890&r1=556889&r2=556890
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java Tue Jul 17 04:38:10 2007
@@ -67,6 +67,10 @@
             body.exchange = ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
 
         }
+        else
+        {
+            body.exchange = body.exchange.intern();
+        }
         VirtualHost vHost = session.getVirtualHost();
         Exchange e = vHost.getExchangeRegistry().getExchange(body.exchange);
         // if the exchange does not exist we raise a channel exception
@@ -86,10 +90,16 @@
                 throw body.getChannelNotFoundException(evt.getChannelId());
             }
 
+            if(body.routingKey != null)
+            {
+                body.routingKey = body.routingKey.intern();
+            }
+            
             MessagePublishInfo info = session.getRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
             channel.setPublishFrame(info, session);
         }
     }
 }
+
 
 

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=556890&r1=556889&r2=556890
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Tue Jul 17 04:38:10 2007
@@ -83,7 +83,9 @@
                     try
                     {
 
-                    exchange = exchangeFactory.createExchange(body.exchange, body.type, body.durable,
+                    exchange = exchangeFactory.createExchange(body.exchange == null ? null : body.exchange.intern(),
+                                                              body.type == null ? null : body.type.intern(), 
+                                                              body.durable,
                                                               body.passive, body.ticket);
                     exchangeRegistry.registerExchange(exchange);
                     }

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=556890&r1=556889&r2=556890
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Tue Jul 17 04:38:10 2007
@@ -97,6 +97,12 @@
         {
             throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
         }
+
+        if (body.routingKey != null)
+        {
+            body.routingKey = body.routingKey.intern();
+        }
+
         try
         {
             if (!exch.isBound(body.routingKey, body.arguments, queue))

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=556890&r1=556889&r2=556890
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Tue Jul 17 04:38:10 2007
@@ -91,8 +91,15 @@
         synchronized (queueRegistry)
         {
 
+
+
             if (((queue = queueRegistry.getQueue(body.queue)) == null))
             {
+                if(body.queue != null)
+                {
+                    body.queue = body.queue.intern();
+                }
+
                 if (body.passive)
                 {
                     String msg = "Queue: " + body.queue + " not found on VirtualHost(" + virtualHost + ").";

Modified: incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?view=diff&rev=556890&r1=556889&r2=556890
==============================================================================
--- incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Tue Jul 17 04:38:10 2007
@@ -26,6 +26,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.lang.ref.WeakReference;
+
 /**
  * A short string is a representation of an AMQ Short String
  * Short strings differ from the Java String class by being limited to on ASCII characters (0-127)
@@ -34,6 +38,10 @@
  */
 public final class AMQShortString implements CharSequence, Comparable<AMQShortString>
 {
+
+    private static final Map<AMQShortString, WeakReference<AMQShortString>> internMap =
+            new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>();
+
     private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class);
 
     private final ByteBuffer _data;
@@ -43,7 +51,6 @@
 
     public AMQShortString(byte[] data)
     {
-
         _data = ByteBuffer.wrap(data);
         _length = data.length;
     }
@@ -374,6 +381,29 @@
             }
 
             return (length() == name.length()) ? 0 : -1;
+        }
+    }
+
+
+    public AMQShortString intern()
+    {
+        hashCode();
+        synchronized(internMap)
+        {
+
+            WeakReference<AMQShortString> ref = internMap.get(this);
+            if(ref != null)
+            {
+                AMQShortString internString = ref.get();
+                if(internString != null)
+                {
+                    return internString;
+                }
+            }
+
+            AMQShortString internString = new AMQShortString(getBytes());
+            internMap.put(internString, new WeakReference(internString));
+            return internString;
         }
     }
 }