You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/29 11:59:38 UTC
svn commit: r501003 - in /incubator/qpid/trunk/qpid: gentools/templ.java/
java/broker/src/main/java/org/apache/qpid/server/handler/
java/broker/src/main/java/org/apache/qpid/server/protocol/
java/broker/src/main/java/org/apache/qpid/server/queue/ java/...
Author: rgreig
Date: Mon Jan 29 02:59:33 2007
New Revision: 501003
URL: http://svn.apache.org/viewvc?view=rev&rev=501003
Log:
QPID-320 : Patch supplied by Rob Godfrey - Improve performance by remembering protocol version
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java
Modified:
incubator/qpid/trunk/qpid/gentools/templ.java/MethodBodyClass.tmpl
incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
Modified: incubator/qpid/trunk/qpid/gentools/templ.java/MethodBodyClass.tmpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/gentools/templ.java/MethodBodyClass.tmpl?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/gentools/templ.java/MethodBodyClass.tmpl (original)
+++ incubator/qpid/trunk/qpid/gentools/templ.java/MethodBodyClass.tmpl Mon Jan 29 02:59:33 2007
@@ -42,6 +42,12 @@
{
return new ${CLASS}${METHOD}Body(major, minor, in);
}
+
+ public AMQMethodBody newInstance(byte major, byte minor, int clazzID, int methodID, ByteBuffer in, long size) throws AMQFrameDecodingException
+ {
+ return new ${CLASS}${METHOD}Body(major, minor, clazzID, methodID, in);
+ }
+
};
public static AMQMethodBodyInstanceFactory getFactory()
@@ -82,12 +88,18 @@
public ${CLASS}${METHOD}Body(byte major, byte minor, ByteBuffer buffer) throws AMQFrameDecodingException
{
+ this(major, minor, getClazz(major,minor), getMethod(major,minor), buffer);
+ }
+
+ public ${CLASS}${METHOD}Body(byte major, byte minor, int clazzID, int methodID, ByteBuffer buffer) throws AMQFrameDecodingException
+ {
+
super(major, minor);
- _clazz = getClazz(major,minor);
- _method = getMethod(major,minor);
+ _clazz = clazzID;
+ _method = methodID;
%{FLIST} ${mb_field_decode}
}
- public ${CLASS}${METHOD}Body(byte major, byte minor
+ public ${CLASS}${METHOD}Body(byte major, byte minor, int clazzID, int methodID
%{FLIST} ${mb_field_parameter_list}
)
{
@@ -149,7 +161,19 @@
%{FLIST} ${mb_field_parameter_list}
)
{
- ${CLASS}${METHOD}Body body = new ${CLASS}${METHOD}Body(major, minor
+ return createAMQFrame(channelId, major, minor, getClazz(major,minor), getMethod(major,minor)
+%{FLIST} ${mb_field_passed_parameter_list}
+ );
+
+
+
+ }
+
+ public static AMQFrame createAMQFrame(int channelId, byte major, byte minor, int clazzID, int methodID
+%{FLIST} ${mb_field_parameter_list}
+ )
+ {
+ ${CLASS}${METHOD}Body body = new ${CLASS}${METHOD}Body(major, minor, clazzID, methodID
%{FLIST} ${mb_field_passed_parameter_list}
);
@@ -157,4 +181,5 @@
AMQFrame frame = new AMQFrame(channelId, body);
return frame;
}
+
}
Modified: incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl (original)
+++ incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl Mon Jan 29 02:59:33 2007
@@ -33,13 +33,19 @@
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
-class MainRegistry
+public class MainRegistry
{
private static final HashMap<Long, AMQMethodBodyInstanceFactory> classIDMethodIDVersionBodyMap = new HashMap<Long, AMQMethodBodyInstanceFactory>();
private static final Logger _log = Logger.getLogger(MainRegistry.class);
+
+ private static final int DEFAULT_MINOR_VERSION_COUNT = 10;
+ private static final int DEFAULT_MAJOR_VERSION_COUNT = 10;
+
+ private static VersionSpecificRegistry[][] _specificRegistries = new VersionSpecificRegistry[DEFAULT_MAJOR_VERSION_COUNT][];
+
static
{
%{CLIST} ${reg_map_put_method}
@@ -48,7 +54,9 @@
public static AMQMethodBody get(short classID, short methodID, byte major, byte minor, ByteBuffer in, long size)
throws AMQFrameDecodingException
{
- AMQMethodBodyInstanceFactory bodyFactory = classIDMethodIDVersionBodyMap.get(createMapKey(classID,methodID,major,minor));
+ VersionSpecificRegistry registry = getVersionSpecificRegistry(major, minor);
+ AMQMethodBodyInstanceFactory bodyFactory = registry.getMethodBody(classID,methodID);
+
if (bodyFactory == null)
{
throw new AMQFrameDecodingException(_log,
@@ -59,23 +67,65 @@
}
+
+ public static VersionSpecificRegistry getVersionSpecificRegistry(byte major, byte minor)
+ {
+ try
+ {
+ return _specificRegistries[(int)major][(int)minor];
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ return null;
+ }
+ catch (NullPointerException e)
+ {
+ return null;
+ }
+
+
+ }
+ private static VersionSpecificRegistry addVersionSpecificRegistry(byte major, byte minor)
+ {
+ VersionSpecificRegistry[][] registries = _specificRegistries;
+ if(major >= registries.length)
+ {
+ _specificRegistries = new VersionSpecificRegistry[(int)major + 1][];
+ System.arraycopy(registries, 0, _specificRegistries, 0, registries.length);
+ registries = _specificRegistries;
+ }
+ if(registries[major] == null)
+ {
+ registries[major] = new VersionSpecificRegistry[ minor >= DEFAULT_MINOR_VERSION_COUNT ? minor + 1 : DEFAULT_MINOR_VERSION_COUNT ];
+ }
+ else if(registries[major].length <= minor)
+ {
+ VersionSpecificRegistry[] minorArray = registries[major];
+ registries[major] = new VersionSpecificRegistry[ minor + 1 ];
+ System.arraycopy(minorArray, 0, registries[major], 0, minorArray.length);
+
+ }
+
+ VersionSpecificRegistry newRegistry = new VersionSpecificRegistry(major,minor);
+
+ registries[major][minor] = newRegistry;
+
+ return newRegistry;
+ }
+
private static void registerMethod(short classID, short methodID, byte major, byte minor, AMQMethodBodyInstanceFactory instanceFactory )
{
- classIDMethodIDVersionBodyMap.put(createMapKey(classID,methodID,major,minor), instanceFactory);
+ VersionSpecificRegistry registry = getVersionSpecificRegistry(major,minor);
+ if(registry == null)
+ {
+ registry = addVersionSpecificRegistry(major,minor);
+
+ }
+
+ registry.registerMethod(classID, methodID, instanceFactory);
+
}
- private static Long createMapKey(short classID, short methodID, byte major, byte minor)
- {
- /**
- * Mapping of 4 components into a guaranteed unique key:
- * MSB LSB
- * +----+----+----+----+----+----+-----+-----+
- * | 0 | classID |methodID |major|minor|
- * +----+----+----+----+----+----+-----+-----+
- */
- return new Long(((long)classID << 32) + ((long)methodID << 16) + ((long)major << 8) + minor);
- }
-
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java Mon Jan 29 02:59:33 2007
@@ -50,6 +50,6 @@
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody((byte)8, (byte)0)));
+ session.writeFrame(BasicQosOkBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0));
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Jan 29 02:59:33 2007
@@ -61,6 +61,9 @@
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
+ // to save boxing the channelId and looking up in a map... cache in an array the low numbered
+ // channels. This value must be of the form 2^x - 1.
+ private static final int CHANNEL_CACHE_SIZE = 0xff;
private final IoSession _minaProtocolSession;
@@ -70,6 +73,8 @@
private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
+ private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE+1];
+
private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
private final AMQStateManager _stateManager;
@@ -89,10 +94,12 @@
private long _maxNoOfChannels = 1000;
/* AMQP Version for this session */
- private byte _major;
- private byte _minor;
+ private byte _major = pv[pv.length-1][PROTOCOL_MAJOR];
+ private byte _minor = pv[pv.length-1][PROTOCOL_MINOR];
private FieldTable _clientProperties;
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+ private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]);
+
public ManagedObject getManagedObject()
@@ -165,11 +172,14 @@
try
{
pi.checkVersion(this); // Fails if not correct
+
// This sets the protocol version (and hence framing classes) for this session.
- _major = pi.protocolMajor;
- _minor = pi.protocolMinor;
+ setProtocolVersion(pi.protocolMajor,pi.protocolMinor);
+
String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
+
String locales = "en_US";
+
// Interfacing with generated code - be aware of possible changes to parameter order as versions change.
AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
_major, _minor, // AMQP version (major, minor)
@@ -200,7 +210,7 @@
{
AMQFrame frame = (AMQFrame) message;
- if (frame.bodyFrame instanceof AMQMethodBody)
+ if (frame.getBodyFrame() instanceof AMQMethodBody)
{
methodFrameReceived(frame);
}
@@ -217,8 +227,8 @@
{
_logger.debug("Method frame received: " + frame);
}
- final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel,
- (AMQMethodBody) frame.bodyFrame);
+ final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(),
+ (AMQMethodBody) frame.getBodyFrame());
try
{
try
@@ -241,14 +251,14 @@
catch (AMQChannelException e)
{
_logger.error("Closing channel due to: " + e.getMessage());
- writeFrame(e.getCloseFrame(frame.channel));
- closeChannel(frame.channel);
+ writeFrame(e.getCloseFrame(frame.getChannel()));
+ closeChannel(frame.getChannel());
}
catch (AMQConnectionException e)
{
_logger.error("Closing connection due to: " + e.getMessage());
closeSession();
- writeFrame(e.getCloseFrame(frame.channel));
+ writeFrame(e.getCloseFrame(frame.getChannel()));
}
}
catch (Exception e)
@@ -264,15 +274,15 @@
private void contentFrameReceived(AMQFrame frame) throws AMQException
{
- if (frame.bodyFrame instanceof ContentHeaderBody)
+ if (frame.getBodyFrame() instanceof ContentHeaderBody)
{
contentHeaderReceived(frame);
}
- else if (frame.bodyFrame instanceof ContentBody)
+ else if (frame.getBodyFrame() instanceof ContentBody)
{
contentBodyReceived(frame);
}
- else if (frame.bodyFrame instanceof HeartbeatBody)
+ else if (frame.getBodyFrame() instanceof HeartbeatBody)
{
_logger.debug("Received heartbeat from client");
}
@@ -288,7 +298,7 @@
{
_logger.debug("Content header frame received: " + frame);
}
- getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame);
+ getChannel(frame.getChannel()).publishContentHeader((ContentHeaderBody) frame.getBodyFrame());
}
private void contentBodyReceived(AMQFrame frame) throws AMQException
@@ -297,7 +307,7 @@
{
_logger.debug("Content body frame received: " + frame);
}
- getChannel(frame.channel).publishContentBody((ContentBody)frame.bodyFrame, this);
+ getChannel(frame.getChannel()).publishContentBody((ContentBody)frame.getBodyFrame(), this);
}
/**
@@ -329,7 +339,9 @@
public AMQChannel getChannel(int channelId) throws AMQException
{
- return _channelMap.get(channelId);
+ return ((channelId & CHANNEL_CACHE_SIZE) == channelId)
+ ? _cachedChannels[channelId]
+ : _channelMap.get(channelId);
}
public void addChannel(AMQChannel channel) throws AMQException
@@ -339,7 +351,13 @@
throw new AMQException("Session is closed");
}
- _channelMap.put(channel.getChannelId(), channel);
+ final int channelId = channel.getChannelId();
+ _channelMap.put(channelId, channel);
+
+ if(((channelId & CHANNEL_CACHE_SIZE) == channelId))
+ {
+ _cachedChannels[channelId] = channel;
+ }
checkForNotification();
}
@@ -389,7 +407,7 @@
*/
public void closeChannel(int channelId) throws AMQException
{
- final AMQChannel channel = _channelMap.get(channelId);
+ final AMQChannel channel = getChannel(channelId);
if (channel == null)
{
throw new IllegalArgumentException("Unknown channel id");
@@ -402,7 +420,8 @@
}
finally
{
- _channelMap.remove(channelId);
+ removeChannel(channelId);
+
}
}
}
@@ -415,6 +434,10 @@
public void removeChannel(int channelId)
{
_channelMap.remove(channelId);
+ if((channelId & CHANNEL_CACHE_SIZE) == channelId)
+ {
+ _cachedChannels[channelId] = null;
+ }
}
/**
@@ -444,6 +467,10 @@
channel.close(this);
}
_channelMap.clear();
+ for(int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
+ {
+ _cachedChannels[i]=null;
+ }
}
/**
@@ -534,10 +561,12 @@
}
}
- /**
- * Convenience methods for managing AMQP version.
- * NOTE: Both major and minor will be set to 0 prior to protocol initiation.
- */
+ private void setProtocolVersion(byte major, byte minor)
+ {
+ _major = major;
+ _minor = minor;
+ _registry = MainRegistry.getVersionSpecificRegistry(major,minor);
+ }
public byte getProtocolMajorVersion()
{
@@ -553,6 +582,12 @@
{
return _major == major && _minor == minor;
}
+
+ public VersionSpecificRegistry getRegistry()
+ {
+ return _registry;
+ }
+
public Object getClientIdentifier()
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Mon Jan 29 02:59:33 2007
@@ -120,7 +120,10 @@
_logger.info("Protocol Session closed");
final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
//fixme -- this can be null
- amqProtocolSession.closeSession();
+ if(amqProtocolSession != null)
+ {
+ amqProtocolSession.closeSession();
+ }
}
public void sessionIdle(IoSession session, IdleStatus status) throws Exception
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Mon Jan 29 02:59:33 2007
@@ -24,6 +24,7 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.protocol.AMQProtocolWriter;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.AMQException;
@@ -31,7 +32,7 @@
import javax.security.sasl.SaslServer;
-public interface AMQProtocolSession extends AMQProtocolWriter
+public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
{
@@ -143,9 +144,5 @@
void addSessionCloseTask(Task task);
void removeSessionCloseTask(Task task);
-
- byte getProtocolMajorVersion();
-
- byte getProtocolMinorVersion();
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Mon Jan 29 02:59:33 2007
@@ -53,7 +53,7 @@
*/
private AMQProtocolSession _publisher;
- private final long _messageId;
+ private final Long _messageId;
private final AtomicInteger _referenceCount = new AtomicInteger(1);
@@ -68,6 +68,13 @@
* messages published with the 'immediate' flag.
*/
private boolean _deliveredToConsumer;
+ /**
+ * We need to keep track of whether the message was 'immediate'
+ * as in extreme circumstances, when the checkDelieveredToConsumer
+ * is called, the message may already have been received and acknowledged,
+ * and the body removed from the store.
+ */
+ private boolean _immediate;
private AtomicBoolean _taken = new AtomicBoolean(false);
@@ -160,11 +167,12 @@
}
}
- public AMQMessage(long messageId, BasicPublishBody publishBody,
+ public AMQMessage(Long messageId, BasicPublishBody publishBody,
TransactionalContext txnContext)
{
_messageId = messageId;
_txnContext = txnContext;
+ _immediate = publishBody.immediate;
_transientMessageData.setPublishBody(publishBody);
_taken = new AtomicBoolean(false);
@@ -183,7 +191,7 @@
* @param factory
* @throws AMQException
*/
- public AMQMessage(long messageId, MessageStore store, MessageHandleFactory factory) throws AMQException
+ public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory) throws AMQException
{
_messageId = messageId;
_messageHandle = factory.createMessageHandle(messageId, store, true);
@@ -198,7 +206,7 @@
* @param txnContext
* @param contentHeader
*/
- public AMQMessage(long messageId, BasicPublishBody publishBody,
+ public AMQMessage(Long messageId, BasicPublishBody publishBody,
TransactionalContext txnContext, ContentHeaderBody contentHeader) throws AMQException
{
this(messageId, publishBody, txnContext);
@@ -216,7 +224,7 @@
* @param contentBodies
* @throws AMQException
*/
- public AMQMessage(long messageId, BasicPublishBody publishBody,
+ public AMQMessage(Long messageId, BasicPublishBody publishBody,
TransactionalContext txnContext,
ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues,
List<ContentBody> contentBodies, MessageStore messageStore, StoreContext storeContext,
@@ -293,8 +301,9 @@
public boolean addContentBodyFrame(StoreContext storeContext, ContentBody contentBody) throws AMQException
{
_transientMessageData.addBodyLength(contentBody.getSize());
- _messageHandle.addContentBodyFrame(storeContext, _messageId, contentBody);
- if (isAllContentReceived())
+ final boolean allContentReceived = isAllContentReceived();
+ _messageHandle.addContentBodyFrame(storeContext, _messageId, contentBody, allContentReceived);
+ if (allContentReceived)
{
deliver(storeContext);
return true;
@@ -348,6 +357,7 @@
{
_log.debug("Ref count on message " + _messageId + " is zero; removing message");
}
+
// must check if the handle is null since there may be cases where we decide to throw away a message
// and the handle has not yet been constructed
if (_messageHandle != null)
@@ -372,6 +382,10 @@
Thread.dumpStack();
}
}
+ if(_referenceCount.get()<0)
+ {
+ throw new MessageCleanupException("Reference count for message id " + _messageId + " has gone below 0.");
+ }
}
}
@@ -464,8 +478,8 @@
*/
public void checkDeliveredToConsumer() throws NoConsumersException, AMQException
{
- BasicPublishBody pb = getPublishBody();
- if (pb.immediate && !_deliveredToConsumer)
+
+ if (_immediate && !_deliveredToConsumer)
{
throw new NoConsumersException(this);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java Mon Jan 29 02:59:33 2007
@@ -35,17 +35,17 @@
*/
public interface AMQMessageHandle
{
- ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException;
+ ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException;
/**
* @return the number of body frames associated with this message
*/
- int getBodyCount(long messageId) throws AMQException;
+ int getBodyCount(Long messageId) throws AMQException;
/**
* @return the size of the body
*/
- long getBodySize(long messageId) throws AMQException;
+ long getBodySize(Long messageId) throws AMQException;
/**
* Get a particular content body
@@ -53,25 +53,25 @@
* @return a content body
* @throws IllegalArgumentException if the index is invalid
*/
- ContentBody getContentBody(long messageId, int index) throws IllegalArgumentException, AMQException;
+ ContentBody getContentBody(Long messageId, int index) throws IllegalArgumentException, AMQException;
- void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) throws AMQException;
+ void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException;
- BasicPublishBody getPublishBody(long messageId) throws AMQException;
+ BasicPublishBody getPublishBody(Long messageId) throws AMQException;
boolean isRedelivered();
void setRedelivered(boolean redelivered);
- boolean isPersistent(long messageId) throws AMQException;
+ boolean isPersistent(Long messageId) throws AMQException;
- void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody,
+ void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
ContentHeaderBody contentHeaderBody)
throws AMQException;
- void removeMessage(StoreContext storeContext, long messageId) throws AMQException;
+ void removeMessage(StoreContext storeContext, Long messageId) throws AMQException;
- void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException;
+ void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException;
- void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException;
+ void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException;
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Jan 29 02:59:33 2007
@@ -252,6 +252,12 @@
return _deliveryMgr.getMessages();
}
+ public long getQueueDepth()
+ {
+ return _deliveryMgr.getTotalMessageSize();
+ }
+
+
/**
* @param messageId
* @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
@@ -315,13 +321,13 @@
_maxMessageCount = value;
}
- public Long getMaximumQueueDepth()
+ public long getMaximumQueueDepth()
{
return _maxQueueDepth;
}
// Sets the queue depth, the max queue size
- public void setMaximumQueueDepth(Long value)
+ public void setMaximumQueueDepth(long value)
{
_maxQueueDepth = value;
}
@@ -624,5 +630,7 @@
{
_deleteTaskList.add(task);
}
+
+
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Mon Jan 29 02:59:33 2007
@@ -191,25 +191,13 @@
*/
public Long getQueueDepth() throws JMException
{
- List<AMQMessage> list = _queue.getMessagesOnTheQueue();
- if (list.size() == 0)
- {
- return 0l;
- }
+ return getQueueDepthKb();
+ }
- long queueDepth = 0;
- try
- {
- for (AMQMessage message : list)
- {
- queueDepth = queueDepth + getMessageSize(message);
- }
- }
- catch (AMQException e)
- {
- throw new JMException("Unable to get message size: " + e);
- }
- return (long) Math.round(queueDepth / 1000);
+ public long getQueueDepthKb()
+ {
+ long queueBytesSize = _queue.getQueueDepth();
+ return queueBytesSize >> 10 ;
}
/**
@@ -245,7 +233,7 @@
}
// Check for threshold queue depth in bytes
- long queueDepth = getQueueDepth();
+ long queueDepth = getQueueDepthKb();
if (queueDepth >= _queue.getMaximumQueueDepth())
{
notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value");
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Mon Jan 29 02:59:33 2007
@@ -38,6 +38,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
/**
@@ -83,6 +84,7 @@
* Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
*/
private ReentrantLock _lock = new ReentrantLock();
+ private AtomicLong _totalMessageSize = new AtomicLong();
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
@@ -116,6 +118,8 @@
_messages.offer(msg);
+ _totalMessageSize.addAndGet(msg.getSize());
+
return true;
}
@@ -150,6 +154,13 @@
}
+
+ public long getTotalMessageSize()
+ {
+ return _totalMessageSize.get();
+ }
+
+
public synchronized List<AMQMessage> getMessages()
{
return new ArrayList<AMQMessage>(_messages);
@@ -213,6 +224,7 @@
}
msg.writeGetOk(protocolSession, channel.getChannelId(), deliveryTag, _queue.getMessageCount());
+ _totalMessageSize.addAndGet(-msg.getSize());
}
}
finally
@@ -224,6 +236,7 @@
}
}
+
public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
AMQMessage msg = poll();
@@ -231,6 +244,7 @@
{
msg.dequeue(storeContext, _queue);
}
+ _totalMessageSize.getAndAdd(-msg.getSize());
}
public synchronized long clearAllMessages(StoreContext storeContext) throws AMQException
@@ -241,8 +255,11 @@
{
msg.dequeue(storeContext, _queue);
count++;
+ _totalMessageSize.set(0L);
msg = poll();
+
}
+
return count;
}
@@ -292,6 +309,7 @@
//remove sent message from our queue.
messageQueue.poll();
+ _totalMessageSize.addAndGet(-message.getSize());
}
catch (AMQException e)
{
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Mon Jan 29 02:59:33 2007
@@ -81,4 +81,6 @@
void populatePreDeliveryQueue(Subscription subscription);
boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException;
+
+ long getTotalMessageSize();
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java Mon Jan 29 02:59:33 2007
@@ -47,22 +47,22 @@
{
}
- public ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException
+ public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException
{
return _contentHeaderBody;
}
- public int getBodyCount(long messageId)
+ public int getBodyCount(Long messageId)
{
return _contentBodies.size();
}
- public long getBodySize(long messageId) throws AMQException
+ public long getBodySize(Long messageId) throws AMQException
{
return getContentHeaderBody(messageId).bodySize;
}
- public ContentBody getContentBody(long messageId, int index) throws AMQException, IllegalArgumentException
+ public ContentBody getContentBody(Long messageId, int index) throws AMQException, IllegalArgumentException
{
if (index > _contentBodies.size() - 1)
{
@@ -72,13 +72,13 @@
return _contentBodies.get(index);
}
- public void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody)
+ public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody)
throws AMQException
{
_contentBodies.add(contentBody);
}
- public BasicPublishBody getPublishBody(long messageId) throws AMQException
+ public BasicPublishBody getPublishBody(Long messageId) throws AMQException
{
return _publishBody;
}
@@ -94,7 +94,7 @@
_redelivered = redelivered;
}
- public boolean isPersistent(long messageId) throws AMQException
+ public boolean isPersistent(Long messageId) throws AMQException
{
//todo remove literal values to a constant file such as AMQConstants in common
ContentHeaderBody chb = getContentHeaderBody(messageId);
@@ -108,7 +108,7 @@
* @param contentHeaderBody
* @throws AMQException
*/
- public void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody,
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
ContentHeaderBody contentHeaderBody)
throws AMQException
{
@@ -116,17 +116,17 @@
_contentHeaderBody = contentHeaderBody;
}
- public void removeMessage(StoreContext storeContext, long messageId) throws AMQException
+ public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
{
// NO OP
}
- public void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
+ public void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
{
// NO OP
}
- public void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
+ public void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
{
// NO OP
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java Mon Jan 29 02:59:33 2007
@@ -32,4 +32,8 @@
{
super("Failed to cleanup message with id " + messageId, e);
}
+ public MessageCleanupException(String message)
+ {
+ super(message);
+ }
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java Mon Jan 29 02:59:33 2007
@@ -31,7 +31,7 @@
public class MessageHandleFactory
{
- public AMQMessageHandle createMessageHandle(long messageId, MessageStore store, boolean persistent)
+ public AMQMessageHandle createMessageHandle(Long messageId, MessageStore store, boolean persistent)
{
// just hardcoded for now
if (persistent)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java Mon Jan 29 02:59:33 2007
@@ -32,6 +32,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.LinkedList;
+import java.util.Collections;
/**
* @author Robert Greig (robert.j.greig@jpmorgan.com)
@@ -48,12 +49,13 @@
private final MessageStore _messageStore;
+
public WeakReferenceMessageHandle(MessageStore messageStore)
{
_messageStore = messageStore;
}
- public ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException
+ public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException
{
ContentHeaderBody chb = (_contentHeaderBody != null?_contentHeaderBody.get():null);
if (chb == null)
@@ -66,7 +68,7 @@
return chb;
}
- public int getBodyCount(long messageId) throws AMQException
+ public int getBodyCount(Long messageId) throws AMQException
{
if (_contentBodies == null)
{
@@ -81,12 +83,12 @@
return _contentBodies.size();
}
- public long getBodySize(long messageId) throws AMQException
+ public long getBodySize(Long messageId) throws AMQException
{
return getContentHeaderBody(messageId).bodySize;
}
- public ContentBody getContentBody(long messageId, int index) throws AMQException, IllegalArgumentException
+ public ContentBody getContentBody(Long messageId, int index) throws AMQException, IllegalArgumentException
{
if (index > _contentBodies.size() - 1)
{
@@ -108,19 +110,30 @@
* @param storeContext
* @param messageId
* @param contentBody
+ * @param isLastContentBody
* @throws AMQException
*/
- public void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) throws AMQException
+ public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException
{
- if (_contentBodies == null)
+ if(_contentBodies == null && isLastContentBody)
{
- _contentBodies = new LinkedList<WeakReference<ContentBody>>();
+ _contentBodies = Collections.singletonList(new WeakReference<ContentBody>(contentBody));
+
+ }
+ else
+ {
+ if (_contentBodies == null)
+ {
+ _contentBodies = new LinkedList<WeakReference<ContentBody>>();
+ }
+
+
+ _contentBodies.add(new WeakReference<ContentBody>(contentBody));
}
- _contentBodies.add(new WeakReference<ContentBody>(contentBody));
- _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody);
+ _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody, isLastContentBody);
}
- public BasicPublishBody getPublishBody(long messageId) throws AMQException
+ public BasicPublishBody getPublishBody(Long messageId) throws AMQException
{
BasicPublishBody bpb = (_publishBody != null?_publishBody.get():null);
if (bpb == null)
@@ -143,7 +156,7 @@
_redelivered = redelivered;
}
- public boolean isPersistent(long messageId) throws AMQException
+ public boolean isPersistent(Long messageId) throws AMQException
{
//todo remove literal values to a constant file such as AMQConstants in common
ContentHeaderBody chb = getContentHeaderBody(messageId);
@@ -157,7 +170,7 @@
* @param contentHeaderBody
* @throws AMQException
*/
- public void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody,
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
ContentHeaderBody contentHeaderBody)
throws AMQException
{
@@ -173,17 +186,17 @@
_contentHeaderBody = new WeakReference<ContentHeaderBody>(contentHeaderBody);
}
- public void removeMessage(StoreContext storeContext, long messageId) throws AMQException
+ public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
{
_messageStore.removeMessage(storeContext, messageId);
}
- public void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
+ public void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
{
_messageStore.enqueueMessage(storeContext, queue.getName(), messageId);
}
- public void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
+ public void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
{
_messageStore.dequeueMessage(storeContext, queue.getName(), messageId);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Mon Jan 29 02:59:33 2007
@@ -36,6 +36,7 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.EnumMap;
import java.util.concurrent.CopyOnWriteArraySet;
/**
@@ -59,8 +60,9 @@
* Maps from an AMQState instance to a Map from Class to StateTransitionHandler.
* The class must be a subclass of AMQFrame.
*/
- private final Map<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap =
- new HashMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>();
+ private final EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap =
+ new EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>(AMQState.class);
+
private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
@@ -83,14 +85,7 @@
protected void registerListeners()
{
- Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> frame2handlerMap =
- new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
-
- // we need to register a map for the null (i.e. all state) handlers otherwise you get
- // a stack overflow in the handler searching code when you present it with a frame for which
- // no handlers are registered
- //
- _state2HandlersMap.put(null, frame2handlerMap);
+ Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> frame2handlerMap;
frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
frame2handlerMap.put(ConnectionStartOkBody.class, ConnectionStartOkMethodHandler.getInstance());
@@ -205,26 +200,14 @@
final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>
classToHandlerMap = _state2HandlersMap.get(currentState);
- if (classToHandlerMap == null)
- {
- // if no specialised per state handler is registered look for a
- // handler registered for "all" states
- return findStateTransitionHandler(null, frame);
- }
- final StateAwareMethodListener<B> handler = (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass());
+ final StateAwareMethodListener<B> handler = classToHandlerMap == null
+ ? null
+ : (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass());
+
if (handler == null)
{
- if (currentState == null)
- {
- _logger.debug("No state transition handler defined for receiving frame " + frame);
- return null;
- }
- else
- {
- // if no specialised per state handler is registered look for a
- // handler registered for "all" states
- return findStateTransitionHandler(null, frame);
- }
+ _logger.debug("No state transition handler defined for receiving frame " + frame);
+ return null;
}
else
{
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Mon Jan 29 02:59:33 2007
@@ -27,11 +27,11 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.List;
+import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -87,7 +87,7 @@
}
}
- public void removeMessage(StoreContext context, long messageId)
+ public void removeMessage(StoreContext context, Long messageId)
{
if (_log.isDebugEnabled())
{
@@ -107,12 +107,12 @@
// Not required to do anything
}
- public void enqueueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
{
// Not required to do anything
}
- public void dequeueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException
+ public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
{
// Not required to do anything
}
@@ -142,36 +142,44 @@
return null;
}
- public long getNewMessageId()
+ public Long getNewMessageId()
{
return _messageId.getAndIncrement();
}
- public void storeContentBodyChunk(StoreContext context, long messageId, int index, ContentBody contentBody)
+ public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentBody contentBody, boolean lastContentBody)
throws AMQException
{
List<ContentBody> bodyList = _contentBodyMap.get(messageId);
- if (bodyList == null)
+
+ if(bodyList == null && lastContentBody)
{
- bodyList = new ArrayList<ContentBody>();
- _contentBodyMap.put(messageId, bodyList);
+ _contentBodyMap.put(messageId, Collections.singletonList(contentBody));
}
+ else
+ {
+ if (bodyList == null)
+ {
+ bodyList = new ArrayList<ContentBody>();
+ _contentBodyMap.put(messageId, bodyList);
+ }
- bodyList.add(index, contentBody);
+ bodyList.add(index, contentBody);
+ }
}
- public void storeMessageMetaData(StoreContext context, long messageId, MessageMetaData messageMetaData)
+ public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData)
throws AMQException
{
_metaDataMap.put(messageId, messageMetaData);
}
- public MessageMetaData getMessageMetaData(long messageId) throws AMQException
+ public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
{
return _metaDataMap.get(messageId);
}
- public ContentBody getContentBodyChunk(long messageId, int index) throws AMQException
+ public ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException
{
List<ContentBody> bodyList = _contentBodyMap.get(messageId);
return bodyList.get(index);
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Mon Jan 29 02:59:33 2007
@@ -50,15 +50,15 @@
*/
void close() throws Exception;
- void removeMessage(StoreContext storeContext, long messageId) throws AMQException;
+ void removeMessage(StoreContext storeContext, Long messageId) throws AMQException;
void createQueue(AMQQueue queue) throws AMQException;
void removeQueue(AMQShortString name) throws AMQException;
- void enqueueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException;
+ void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException;
- void dequeueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException;
+ void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException;
void beginTran(StoreContext context) throws AMQException;
@@ -79,14 +79,14 @@
* Return a valid, currently unused message id.
* @return a message id
*/
- long getNewMessageId();
+ Long getNewMessageId();
- void storeContentBodyChunk(StoreContext context, long messageId, int index, ContentBody contentBody) throws AMQException;
+ void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException;
- void storeMessageMetaData(StoreContext context, long messageId, MessageMetaData messageMetaData) throws AMQException;
+ void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException;
- MessageMetaData getMessageMetaData(long messageId) throws AMQException;
+ MessageMetaData getMessageMetaData(Long messageId) throws AMQException;
- ContentBody getContentBodyChunk(long messageId, int index) throws AMQException;
+ ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException;
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java Mon Jan 29 02:59:33 2007
@@ -24,7 +24,7 @@
public abstract class AMQBody
{
- protected abstract byte getFrameType();
+ public abstract byte getFrameType();
/**
* Get the size of the body
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Mon Jan 29 02:59:33 2007
@@ -24,25 +24,29 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import java.util.HashMap;
import java.util.Map;
public class AMQDataBlockDecoder
{
- Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class);
+ private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY";
- private final Map _supportedBodies = new HashMap();
+ private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
- private final static BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
static
{
- _bodiesSupported[AMQMethodBody.TYPE] = AMQMethodBodyFactory.getInstance();
_bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance();
_bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance();
_bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
}
+
+ Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class);
+
+
+
public AMQDataBlockDecoder()
{
}
@@ -55,52 +59,57 @@
{
return false;
}
-
- final byte type = in.get();
- final int channel = in.getUnsignedShort();
+ in.skip(1 + 2);
final long bodySize = in.getUnsignedInt();
- // bodySize can be zero
- if (type <= 0 || channel < 0 || bodySize < 0)
- {
- throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel +
- " bodySize = " + bodySize);
- }
+
return (remainingAfterAttributes >= bodySize);
}
- private boolean isSupportedFrameType(byte frameType)
+
+ protected Object createAndPopulateFrame(IoSession session, ByteBuffer in)
+ throws AMQFrameDecodingException, AMQProtocolVersionException
{
- final boolean result = _bodiesSupported[frameType] != null;
+ final byte type = in.get();
+
+ BodyFactory bodyFactory;
+ if(type == AMQMethodBody.TYPE)
+ {
+ bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
+ if(bodyFactory == null)
+ {
+ AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
+ bodyFactory = new AMQMethodBodyFactory(protocolSession);
+ session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
+
+ }
- if (!result)
+ }
+ else
{
- _logger.warn("AMQDataBlockDecoder does not handle frame type " + frameType);
+ bodyFactory = _bodiesSupported[type];
}
- return result;
- }
- protected Object createAndPopulateFrame(ByteBuffer in)
- throws AMQFrameDecodingException, AMQProtocolVersionException
- {
- final byte type = in.get();
- BodyFactory bodyFactory = _bodiesSupported[type];
- if (!isSupportedFrameType(type))
+
+
+ if(bodyFactory == null)
{
throw new AMQFrameDecodingException("Unsupported frame type: " + type);
}
+
final int channel = in.getUnsignedShort();
final long bodySize = in.getUnsignedInt();
- /*
- if (bodyFactory == null)
+ // bodySize can be zero
+ if (channel < 0 || bodySize < 0)
{
- throw new AMQFrameDecodingException("Unsupported body type: " + type);
+ throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel +
+ " bodySize = " + bodySize);
}
- */
+
AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
@@ -115,6 +124,6 @@
public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
throws Exception
{
- out.write(createAndPopulateFrame(in));
+ out.write(createAndPopulateFrame(session, in));
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java Mon Jan 29 02:59:33 2007
@@ -28,17 +28,16 @@
import java.util.HashSet;
import java.util.Set;
+import java.util.Collections;
-public class AMQDataBlockEncoder implements MessageEncoder
+public final class AMQDataBlockEncoder implements MessageEncoder
{
- Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class);
+ private static final Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class);
- private Set _messageTypes;
+ private final Set _messageTypes = Collections.singleton(EncodableAMQDataBlock.class);
public AMQDataBlockEncoder()
{
- _messageTypes = new HashSet();
- _messageTypes.add(EncodableAMQDataBlock.class);
}
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Mon Jan 29 02:59:33 2007
@@ -24,59 +24,52 @@
public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
{
- public int channel;
+ private final int _channel;
+
+ private final AMQBody _bodyFrame;
- public AMQBody bodyFrame;
- public AMQFrame()
- {
- }
- public AMQFrame(int channel, AMQBody bodyFrame)
+ public AMQFrame(final int channel, final AMQBody bodyFrame)
{
- this.channel = channel;
- this.bodyFrame = bodyFrame;
+ _channel = channel;
+ _bodyFrame = bodyFrame;
}
- public AMQFrame(ByteBuffer in, int channel, long bodySize, BodyFactory bodyFactory) throws AMQFrameDecodingException
+ public AMQFrame(final ByteBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException
{
- this.channel = channel;
- this.bodyFrame = bodyFactory.createBody(in,bodySize);
+ this._channel = channel;
+ this._bodyFrame = bodyFactory.createBody(in,bodySize);
}
public long getSize()
{
- return 1 + 2 + 4 + bodyFrame.getSize() + 1;
+ return 1 + 2 + 4 + _bodyFrame.getSize() + 1;
}
public void writePayload(ByteBuffer buffer)
{
- buffer.put(bodyFrame.getFrameType());
- // TODO: how does channel get populated
- EncodingUtils.writeUnsignedShort(buffer, channel);
- EncodingUtils.writeUnsignedInteger(buffer, bodyFrame.getSize());
- bodyFrame.writePayload(buffer);
+ buffer.put(_bodyFrame.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, _channel);
+ EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
+ _bodyFrame.writePayload(buffer);
buffer.put((byte) 0xCE);
}
- /**
- *
- * @param buffer
- * @param channel unsigned short
- * @param bodySize unsigned integer
- * @param bodyFactory
- * @throws AMQFrameDecodingException
- */
- public void populateFromBuffer(ByteBuffer buffer, int channel, long bodySize, BodyFactory bodyFactory)
- throws AMQFrameDecodingException, AMQProtocolVersionException
- {
- this.channel = channel;
- bodyFrame = bodyFactory.createBody(buffer, bodySize);
-
+ public final int getChannel()
+ {
+ return _channel;
}
+ public final AMQBody getBodyFrame()
+ {
+ return _bodyFrame;
+ }
+
+
+
public String toString()
{
- return "Frame channelId: " + channel + ", bodyFrame: " + String.valueOf(bodyFrame);
+ return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java Mon Jan 29 02:59:33 2007
@@ -57,7 +57,7 @@
protected abstract void writeMethodPayload(ByteBuffer buffer);
- protected byte getFrameType()
+ public byte getFrameType()
{
return TYPE;
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java Mon Jan 29 02:59:33 2007
@@ -22,30 +22,21 @@
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class AMQMethodBodyFactory implements BodyFactory
{
private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class);
+
+ private final AMQVersionAwareProtocolSession _protocolSession;
- private static final AMQMethodBodyFactory _instance = new AMQMethodBodyFactory();
-
- public static AMQMethodBodyFactory getInstance()
- {
- return _instance;
- }
-
- private AMQMethodBodyFactory()
+ public AMQMethodBodyFactory(AMQVersionAwareProtocolSession protocolSession)
{
- _log.debug("Creating method body factory");
+ _protocolSession = protocolSession;
}
public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
- // AMQP version change: MethodBodyDecoderRegistry is obsolete, since all the XML
- // segments generated together are now handled by MainRegistry. The Cluster class,
- // if generated together with amqp.xml is a part of MainRegistry.
- // TODO: Connect with version acquired from ProtocolInitiation class.
- return MainRegistry.get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(),
- (byte)8, (byte)0, in, bodySize);
+ return _protocolSession.getRegistry().get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize);
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java Mon Jan 29 02:59:33 2007
@@ -6,4 +6,5 @@
public abstract interface AMQMethodBodyInstanceFactory
{
public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+ public AMQMethodBody newInstance(byte major, byte minor, int clazzID, int methodID, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Mon Jan 29 02:59:33 2007
@@ -260,24 +260,14 @@
final AMQShortString otherString = (AMQShortString) o;
- if(otherString.length() != length())
- {
- return false;
- }
if((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
{
return false;
}
- final int size = length();
- for(int i = 0; i < size; i++)
- {
- if(_data.get(i) != otherString._data.get(i))
- {
- return false;
- }
- }
- return true;
+ return _data.equals(otherString._data);
+
+
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java Mon Jan 29 02:59:33 2007
@@ -49,7 +49,7 @@
this.payload = payload;
}
- protected byte getFrameType()
+ public byte getFrameType()
{
return TYPE;
}
@@ -98,9 +98,7 @@
public static AMQFrame createAMQFrame(int channelId, ContentBody body)
{
- final AMQFrame frame = new AMQFrame();
- frame.channel = channelId;
- frame.bodyFrame = body;
+ final AMQFrame frame = new AMQFrame(channelId, body);
return frame;
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Mon Jan 29 02:59:33 2007
@@ -65,7 +65,7 @@
this.bodySize = bodySize;
}
- protected byte getFrameType()
+ public byte getFrameType()
{
return TYPE;
}
@@ -113,17 +113,11 @@
public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties,
long bodySize)
{
- final AMQFrame frame = new AMQFrame();
- frame.channel = channelId;
- frame.bodyFrame = new ContentHeaderBody(classId, weight, properties, bodySize);
- return frame;
+ return new AMQFrame(channelId, new ContentHeaderBody(classId, weight, properties, bodySize));
}
public static AMQFrame createAMQFrame(int channelId, ContentHeaderBody body)
{
- final AMQFrame frame = new AMQFrame();
- frame.channel = channelId;
- frame.bodyFrame = body;
- return frame;
+ return new AMQFrame(channelId, body);
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java Mon Jan 29 02:59:33 2007
@@ -41,7 +41,7 @@
}
}
- protected byte getFrameType()
+ public byte getFrameType()
{
return TYPE;
}
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java?view=auto&rev=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java Mon Jan 29 02:59:33 2007
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.log4j.Logger;
+
+public class VersionSpecificRegistry
+{
+ private static final Logger _log = Logger.getLogger(VersionSpecificRegistry.class);
+
+
+ private final byte _protocolMajorVersion;
+ private final byte _protocolMinorVersion;
+
+ private static final int DEFAULT_MAX_CLASS_ID = 200;
+ private static final int DEFAULT_MAX_METHOD_ID = 50;
+
+ private AMQMethodBodyInstanceFactory[][] _registry = new AMQMethodBodyInstanceFactory[DEFAULT_MAX_CLASS_ID][];
+
+ public VersionSpecificRegistry(byte major, byte minor)
+ {
+ _protocolMajorVersion = major;
+ _protocolMinorVersion = minor;
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return _protocolMajorVersion;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return _protocolMinorVersion;
+ }
+
+ public AMQMethodBodyInstanceFactory getMethodBody(final short classID, final short methodID)
+ {
+ try
+ {
+ return _registry[classID][methodID];
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ return null;
+ }
+ catch (NullPointerException e)
+ {
+ return null;
+ }
+ }
+
+ public void registerMethod(final short classID, final short methodID, final AMQMethodBodyInstanceFactory instanceFactory)
+ {
+ if(_registry.length <= classID)
+ {
+ AMQMethodBodyInstanceFactory[][] oldRegistry = _registry;
+ _registry = new AMQMethodBodyInstanceFactory[classID+1][];
+ System.arraycopy(oldRegistry, 0, _registry, 0, oldRegistry.length);
+ }
+
+ if(_registry[classID] == null)
+ {
+ _registry[classID] = new AMQMethodBodyInstanceFactory[methodID > DEFAULT_MAX_METHOD_ID ? methodID + 1 : DEFAULT_MAX_METHOD_ID + 1];
+ }
+ else if(_registry[classID].length <= methodID)
+ {
+ AMQMethodBodyInstanceFactory[] oldMethods = _registry[classID];
+ _registry[classID] = new AMQMethodBodyInstanceFactory[methodID+1];
+ System.arraycopy(oldMethods,0,_registry[classID],0,oldMethods.length);
+ }
+
+ _registry[classID][methodID] = instanceFactory;
+
+ }
+
+
+ public AMQMethodBody get(short classID, short methodID, ByteBuffer in, long size)
+ throws AMQFrameDecodingException
+ {
+ AMQMethodBodyInstanceFactory bodyFactory;
+ try
+ {
+ bodyFactory = _registry[classID][methodID];
+ }
+ catch(NullPointerException e)
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+ }
+ catch(IndexOutOfBoundsException e)
+ {
+ if(classID >= _registry.length)
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+
+ }
+ else
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+
+ }
+ }
+
+
+ if (bodyFactory == null)
+ {
+ throw new AMQFrameDecodingException(_log,
+ "Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+ + " (while trying to decode class " + classID + " method " + methodID + ".");
+ }
+
+
+ return bodyFactory.newInstance(_protocolMajorVersion, _protocolMinorVersion, classID, methodID, in, size);
+
+
+ }
+}
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java?view=auto&rev=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java Mon Jan 29 02:59:33 2007
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.VersionSpecificRegistry;
+
+public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, ProtocolVersionAware
+{
+ public VersionSpecificRegistry getRegistry();
+}
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java?view=auto&rev=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java Mon Jan 29 02:59:33 2007
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+public interface ProtocolVersionAware
+{
+ public byte getProtocolMinorVersion();
+
+ public byte getProtocolMajorVersion();
+}
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java Mon Jan 29 02:59:33 2007
@@ -26,7 +26,6 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.List;
@@ -88,7 +87,7 @@
return _messageId.getAndIncrement();
}
- public void storeContentBodyChunk(StoreContext sc, long messageId, int index, ContentBody contentBody) throws AMQException
+ public void storeContentBodyChunk(StoreContext sc, long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException
{
}