You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/07/17 13:38:13 UTC
svn commit: r556890 - in /incubator/qpid/branches/M2/java:
broker/src/main/java/org/apache/qpid/server/handler/
common/src/main/java/org/apache/qpid/framing/
Author: ritchiem
Date: Tue Jul 17 04:38:10 2007
New Revision: 556890
URL: http://svn.apache.org/viewvc?view=rev&rev=556890
Log:
QPID-541 A large portion of memory was being wasted in 32k ByteBuffers being held by the AMQShortStrings.
Patch submitted by Robert Godfrey to intern() the AMQSSs to reduce memory usage. Current implementation *will* impact performance due to the usage of a static Map for storage. However, a thread local implementation is in the works.
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=556890&r1=556889&r2=556890
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Tue Jul 17 04:38:10 2007
@@ -98,6 +98,12 @@
}
else
{
+
+ if (body.consumerTag != null)
+ {
+ body.consumerTag = body.consumerTag.intern();
+ }
+
try
{
AMQShortString consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
@@ -136,15 +142,15 @@
// If the above doesn't work then perhaps this is wrong too.
// throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
// "Non-unique consumer tag, '" + body.consumerTag + "'");
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
- BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
- AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- msg)); // replyText
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte) 8, (byte) 0), // classId
+ BasicConsumeBody.getMethod((byte) 8, (byte) 0), // methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg)); // replyText
}
catch (AMQQueue.ExistingExclusiveSubscription e)
{
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?view=diff&rev=556890&r1=556889&r2=556890
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java Tue Jul 17 04:38:10 2007
@@ -67,6 +67,10 @@
body.exchange = ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
}
+ else
+ {
+ body.exchange = body.exchange.intern();
+ }
VirtualHost vHost = session.getVirtualHost();
Exchange e = vHost.getExchangeRegistry().getExchange(body.exchange);
// if the exchange does not exist we raise a channel exception
@@ -86,10 +90,16 @@
throw body.getChannelNotFoundException(evt.getChannelId());
}
+ if(body.routingKey != null)
+ {
+ body.routingKey = body.routingKey.intern();
+ }
+
MessagePublishInfo info = session.getRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
channel.setPublishFrame(info, session);
}
}
}
+
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=556890&r1=556889&r2=556890
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Tue Jul 17 04:38:10 2007
@@ -83,7 +83,9 @@
try
{
- exchange = exchangeFactory.createExchange(body.exchange, body.type, body.durable,
+ exchange = exchangeFactory.createExchange(body.exchange == null ? null : body.exchange.intern(),
+ body.type == null ? null : body.type.intern(),
+ body.durable,
body.passive, body.ticket);
exchangeRegistry.registerExchange(exchange);
}
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=556890&r1=556889&r2=556890
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Tue Jul 17 04:38:10 2007
@@ -97,6 +97,12 @@
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
}
+
+ if (body.routingKey != null)
+ {
+ body.routingKey = body.routingKey.intern();
+ }
+
try
{
if (!exch.isBound(body.routingKey, body.arguments, queue))
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=556890&r1=556889&r2=556890
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Tue Jul 17 04:38:10 2007
@@ -91,8 +91,15 @@
synchronized (queueRegistry)
{
+
+
if (((queue = queueRegistry.getQueue(body.queue)) == null))
{
+ if(body.queue != null)
+ {
+ body.queue = body.queue.intern();
+ }
+
if (body.passive)
{
String msg = "Queue: " + body.queue + " not found on VirtualHost(" + virtualHost + ").";
Modified: incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?view=diff&rev=556890&r1=556889&r2=556890
==============================================================================
--- incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Tue Jul 17 04:38:10 2007
@@ -26,6 +26,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.lang.ref.WeakReference;
+
/**
* A short string is a representation of an AMQ Short String
* Short strings differ from the Java String class by being limited to on ASCII characters (0-127)
@@ -34,6 +38,10 @@
*/
public final class AMQShortString implements CharSequence, Comparable<AMQShortString>
{
+
+ private static final Map<AMQShortString, WeakReference<AMQShortString>> internMap =
+ new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>();
+
private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class);
private final ByteBuffer _data;
@@ -43,7 +51,6 @@
public AMQShortString(byte[] data)
{
-
_data = ByteBuffer.wrap(data);
_length = data.length;
}
@@ -374,6 +381,29 @@
}
return (length() == name.length()) ? 0 : -1;
+ }
+ }
+
+
+ public AMQShortString intern()
+ {
+ hashCode();
+ synchronized(internMap)
+ {
+
+ WeakReference<AMQShortString> ref = internMap.get(this);
+ if(ref != null)
+ {
+ AMQShortString internString = ref.get();
+ if(internString != null)
+ {
+ return internString;
+ }
+ }
+
+ AMQShortString internString = new AMQShortString(getBytes());
+ internMap.put(internString, new WeakReference(internString));
+ return internString;
}
}
}