You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/02/14 21:02:22 UTC
svn commit: r507672 [5/16] - in /incubator/qpid/branches/qpid.0-9:
gentools/src/org/apache/qpid/gentools/ gentools/templ.java/
gentools/templ.net/ java/ java/broker/ java/broker/bin/
java/broker/distribution/ java/broker/distribution/src/ java/broker/d...
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java Wed Feb 14 12:02:03 2007
@@ -23,14 +23,16 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageEmptyBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+//import org.apache.log4j.Logger;
+
public class MessageEmptyHandler implements StateAwareMethodListener<MessageEmptyBody>
{
+ //private static final Logger _logger = Logger.getLogger(MessageEmptyHandler.class);
+
private static MessageEmptyHandler _instance = new MessageEmptyHandler();
public static MessageEmptyHandler getInstance()
@@ -39,11 +41,8 @@
}
private MessageEmptyHandler() {}
-
-
- public void methodReceived (AMQProtocolSession protocolSession,
- AMQMethodEvent<MessageEmptyBody> evt)
- throws AMQException
+
+ public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageEmptyBody> evt) throws AMQException
{
// TODO
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java Wed Feb 14 12:02:03 2007
@@ -21,16 +21,28 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.framing.MessageEmptyBody;
+import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import org.apache.log4j.Logger;
public class MessageGetHandler implements StateAwareMethodListener<MessageGetBody>
{
+ private static final Logger _logger = Logger.getLogger(MessageGetHandler.class);
+
private static MessageGetHandler _instance = new MessageGetHandler();
public static MessageGetHandler getInstance()
@@ -40,11 +52,85 @@
private MessageGetHandler() {}
- public void methodReceived (AMQProtocolSession protocolSession,
- AMQMethodEvent<MessageGetBody> evt)
- throws AMQException
+ public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageGetBody> evt) throws AMQException
{
- // TODO
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ final MessageGetBody body = evt.getMethod();
+ final int channelId = evt.getChannelId();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ AMQChannel channel = session.getChannel(channelId);
+ if (channel == null)
+ {
+ _logger.error("Channel " + channelId + " not found");
+ // TODO: either alert or error that the
+ }
+ else
+ {
+ AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
+
+ if (queue == null)
+ {
+ _logger.info("No queue for '" + body.queue + "'");
+ if(body.queue != null)
+ {
+ session.closeChannelRequest(evt.getChannelId(), AMQConstant.NOT_FOUND.getCode(),
+ new AMQShortString("No such queue, '" + body.queue + "'"));
+ }
+ else
+ {
+ session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
+ new AMQShortString("No queue name provided, no default queue defined."),
+ body.getClazz(), body.getMethod());
+ }
+ }
+ else
+ {
+ try
+ {
+ if(queue.performGet(session, channel, !body.noAck))
+ {
+ session.writeResponse(evt, MessageOkBody.createMethodBody(
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion())); // AMQP minor version
+ }
+ else
+ {
+ session.writeResponse(evt, MessageEmptyBody.createMethodBody(
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion())); // AMQP minor version
+ }
+ }
+ catch (AMQInvalidSelectorException ise)
+ {
+ _logger.info("Closing connection due to invalid selector: " + ise.getMessage());
+ session.closeChannelRequest(evt.getChannelId(), AMQConstant.INVALID_SELECTOR.getCode(),
+ new AMQShortString(ise.getMessage()));
+ }
+// catch (ConsumerTagNotUniqueException e)
+// {
+// _logger.info("Closing connection due to duplicate (non-unique) consumer tag: " + e.getMessage());
+// session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
+// new AMQShortString("Non-unique consumer tag, '" + body.destination + "'"),
+// body.getClazz(), body.getMethod());
+// }
+ catch (AMQQueue.ExistingExclusiveSubscription e)
+ {
+ throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " as it already has an existing exclusive consumer");
+ }
+ catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+ {
+ throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " exclusively as it already has a consumer");
+ }
+ }
+ }
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java Wed Feb 14 12:02:03 2007
@@ -23,14 +23,16 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageOffsetBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+//import org.apache.log4j.Logger;
+
public class MessageOffsetHandler implements StateAwareMethodListener<MessageOffsetBody>
{
+ //private static final Logger _logger = Logger.getLogger(MessageOffsetHandler.class);
+
private static MessageOffsetHandler _instance = new MessageOffsetHandler();
public static MessageOffsetHandler getInstance()
@@ -40,10 +42,7 @@
private MessageOffsetHandler() {}
-
- public void methodReceived (AMQProtocolSession protocolSession,
- AMQMethodEvent<MessageOffsetBody> evt)
- throws AMQException
+ public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageOffsetBody> evt) throws AMQException
{
// TODO
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java Wed Feb 14 12:02:03 2007
@@ -23,14 +23,16 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+//import org.apache.log4j.Logger;
+
public class MessageOkHandler implements StateAwareMethodListener<MessageOkBody>
{
+ //private static final Logger _logger = Logger.getLogger(MessageOkHandler.class);
+
private static MessageOkHandler _instance = new MessageOkHandler();
public static MessageOkHandler getInstance()
@@ -40,10 +42,7 @@
private MessageOkHandler() {}
-
- public void methodReceived (AMQProtocolSession protocolSession,
- AMQMethodEvent<MessageOkBody> evt)
- throws AMQException
+ public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageOkBody> evt) throws AMQException
{
// TODO
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java Wed Feb 14 12:02:03 2007
@@ -22,15 +22,19 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+//import org.apache.log4j.Logger;
+
public class MessageOpenHandler implements StateAwareMethodListener<MessageOpenBody>
{
+ //private static final Logger _logger = Logger.getLogger(MessageOpenHandler.class);
+
private static MessageOpenHandler _instance = new MessageOpenHandler();
public static MessageOpenHandler getInstance()
@@ -39,13 +43,15 @@
}
private MessageOpenHandler() {}
-
-
- public void methodReceived (AMQProtocolSession protocolSession,
- AMQMethodEvent<MessageOpenBody> evt)
- throws AMQException
+
+ public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageOpenBody> evt) throws AMQException
{
- // TODO
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+ channel.addMessageOpen(evt.getMethod());
+ session.writeResponse(evt, MessageOkBody.createMethodBody(
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion())); // AMQP minor version
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java Wed Feb 14 12:02:03 2007
@@ -24,14 +24,16 @@
import org.apache.qpid.framing.MessageQosBody;
import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+//import org.apache.log4j.Logger;
+
public class MessageQosHandler implements StateAwareMethodListener<MessageQosBody>
{
+ //private static final Logger _logger = Logger.getLogger(MessageQosHandler.class);
+
private static MessageQosHandler _instance = new MessageQosHandler();
public static MessageQosHandler getInstance()
@@ -41,16 +43,14 @@
private MessageQosHandler() {}
-
- public void methodReceived (AMQProtocolSession protocolSession,
- AMQMethodEvent<MessageQosBody> evt)
- throws AMQException
+ public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageQosBody> evt) throws AMQException
{
- protocolSession.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), new MessageOkBody(
- protocolSession.getMajor(), // AMQP major version
- protocolSession.getMinor())); // AMQP minor version
+ session.writeResponse(evt.getChannelId(), evt.getRequestId(), MessageOkBody.createMethodBody(
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion())); // AMQP minor version
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java Wed Feb 14 12:02:03 2007
@@ -20,19 +20,17 @@
*/
package org.apache.qpid.server.handler;
-import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageRecoverBody;
import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
+
public class MessageRecoverHandler implements StateAwareMethodListener<MessageRecoverBody>
{
private static final Logger _logger = Logger.getLogger(MessageRecoverHandler.class);
@@ -46,24 +44,28 @@
private MessageRecoverHandler() {}
- public void methodReceived (AMQProtocolSession protocolSession,
- AMQMethodEvent<MessageRecoverBody> evt)
- throws AMQException
+ public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageRecoverBody> evt) throws AMQException
{
- _logger.debug("Recover received on protocol session " + protocolSession + " and channel " + evt.getChannelId());
- AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+ _logger.debug("Recover received on protocol session " + session + " and channel " + evt.getChannelId());
if (channel == null)
{
throw new AMQException("Unknown channel " + evt.getChannelId());
}
MessageRecoverBody body = evt.getMethod();
- if (body.requeue) {
+ if (body.requeue)
+ {
channel.requeue();
- } else {
- channel.resend(protocolSession);
}
- MessageOkBody response = MessageOkBody.createMethodBody(protocolSession.getMajor(), protocolSession.getMinor());
- protocolSession.writeResponse(evt, response);
+ else
+ {
+ channel.resend(session, false);
+ }
+ MessageOkBody response = MessageOkBody.createMethodBody(
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion()); // AMQP minor version
+ session.writeResponse(evt, response);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java Wed Feb 14 12:02:03 2007
@@ -23,14 +23,16 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageRejectBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+//import org.apache.log4j.Logger;
+
public class MessageRejectHandler implements StateAwareMethodListener<MessageRejectBody>
{
+ //private static final Logger _logger = Logger.getLogger(MessageRejectHandler.class);
+
private static MessageRejectHandler _instance = new MessageRejectHandler();
public static MessageRejectHandler getInstance()
@@ -39,11 +41,8 @@
}
private MessageRejectHandler() {}
-
-
- public void methodReceived (AMQProtocolSession protocolSession,
- AMQMethodEvent<MessageRejectBody> evt)
- throws AMQException
+
+ public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageRejectBody> evt) throws AMQException
{
// TODO
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java Wed Feb 14 12:02:03 2007
@@ -23,14 +23,16 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageResumeBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+//import org.apache.log4j.Logger;
+
public class MessageResumeHandler implements StateAwareMethodListener<MessageResumeBody>
{
+ //private static final Logger _logger = Logger.getLogger(MessageResumeHandler.class);
+
private static MessageResumeHandler _instance = new MessageResumeHandler();
public static MessageResumeHandler getInstance()
@@ -40,10 +42,7 @@
private MessageResumeHandler() {}
-
- public void methodReceived (AMQProtocolSession protocolSession,
- AMQMethodEvent<MessageResumeBody> evt)
- throws AMQException
+ public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageResumeBody> evt) throws AMQException
{
// TODO
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java Wed Feb 14 12:02:03 2007
@@ -20,21 +20,21 @@
*/
package org.apache.qpid.server.handler;
-import org.apache.qpid.framing.MessageOkBody;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import org.apache.log4j.Logger;
public class MessageTransferHandler implements StateAwareMethodListener<MessageTransferBody>
{
@@ -47,50 +47,41 @@
return _instance;
}
- private static final String UNKNOWN_EXCHANGE_NAME = "Unknown exchange name";
-
private MessageTransferHandler() {}
- public void methodReceived (AMQProtocolSession protocolSession,
- AMQMethodEvent<MessageTransferBody> evt)
- throws AMQException
+ public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageTransferBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
final MessageTransferBody body = evt.getMethod();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
- if (_log.isDebugEnabled()) {
+ if (_log.isDebugEnabled())
+ {
_log.debug("Publish received on channel " + evt.getChannelId());
}
// TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
- if (body.destination == null) {
+ if (body.destination == null)
+ {
body.destination = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
}
- Exchange e = protocolSession.getExchangeRegistry().getExchange(body.destination);
+ Exchange e = exchangeRegistry.getExchange(body.destination);
// if the exchange does not exist we raise a channel exception
- if (e == null) {
-// protocolSession.closeChannel(evt.getChannelId());
-// // TODO: modify code gen to make getClazz and getMethod public methods rather than protected
-// // then we can remove the hardcoded 0,0
-// // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-// // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-// // Be aware of possible changes to parameter order as versions change.
-// AMQMethodBody cf = ChannelCloseBody.createMethodBody
-// ((byte)0, (byte)9, // AMQP version (major, minor)
-// MessageTransferBody.getClazz((byte)0, (byte)9), // classId
-// MessageTransferBody.getMethod((byte)0, (byte)9), // methodId
-// 500, // replyCode
-// UNKNOWN_EXCHANGE_NAME); // replyText
-// protocolSession.writeRequest(evt.getChannelId(), cf, stateManager);
- protocolSession.closeChannelRequest(evt.getChannelId(), 500, UNKNOWN_EXCHANGE_NAME);
- } else {
+ if (e == null)
+ {
+ session.closeChannelRequest(evt.getChannelId(), 500, new AMQShortString("Unknown exchange name"));
+ }
+ else
+ {
// The partially populated BasicDeliver frame plus the received route body
// is stored in the channel. Once the final body frame has been received
// it is routed to the exchange.
- AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
- channel.addMessageTransfer(body, protocolSession);
- protocolSession.writeResponse(evt, MessageOkBody.createMethodBody(
- protocolSession.getMajor(), // AMQP major version
- protocolSession.getMinor())); // AMQP minor version
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+ channel.addMessageTransfer(body, session);
+ session.writeResponse(evt, MessageOkBody.createMethodBody(
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion())); // AMQP minor version
}
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Wed Feb 14 12:02:03 2007
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.handler;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.QueueBindBody;
@@ -34,6 +33,9 @@
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import org.apache.log4j.Logger;
public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
@@ -46,18 +48,21 @@
return _instance;
}
- private QueueBindHandler()
- {
- }
+ private QueueBindHandler() {}
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<QueueBindBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueBindBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
final QueueBindBody body = evt.getMethod();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+
final AMQQueue queue;
if (body.queue == null)
{
- queue = protocolSession.getChannel(evt.getChannelId()).getDefaultQueue();
+ queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
if (queue == null)
{
throw new AMQException("No default queue defined on channel and queue was null");
@@ -69,14 +74,14 @@
}
else
{
- queue = protocolSession.getQueueRegistry().getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.queue);
}
if (queue == null)
{
throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Queue " + body.queue + " does not exist.");
}
- final Exchange exch = protocolSession.getExchangeRegistry().getExchange(body.exchange);
+ final Exchange exch = exchangeRegistry.getExchange(body.exchange);
if (exch == null)
{
throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Exchange " + body.exchange + " does not exist.");
@@ -91,9 +96,9 @@
{
// Be aware of possible changes to parameter order as versions change.
final AMQMethodBody response = QueueBindOkBody.createMethodBody(
- protocolSession.getMajor(), // AMQP major version
- protocolSession.getMinor()); // AMQP minor version
- protocolSession.writeResponse(evt, response);
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion()); // AMQP minor version
+ session.writeResponse(evt, response);
}
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Wed Feb 14 12:02:03 2007
@@ -20,24 +20,27 @@
*/
package org.apache.qpid.server.handler;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.Configured;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.QueueDeclareBody;
import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import org.apache.log4j.Logger;
import java.text.MessageFormat;
import java.util.concurrent.atomic.AtomicInteger;
@@ -58,28 +61,27 @@
private final AtomicInteger _counter = new AtomicInteger();
- private final MessageStore _store;
-
protected QueueDeclareHandler()
{
Configurator.configure(this);
- _store = ApplicationRegistry.getInstance().getMessageStore();
}
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
{
- QueueDeclareBody body = evt.getMethod();
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ final QueueDeclareBody body = evt.getMethod();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ MessageStore store = virtualHost.getMessageStore();
// if we aren't given a queue name, we create one which we return to the client
if (body.queue == null)
{
body.queue = createName();
}
- //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
AMQQueue queue = null;
- QueueRegistry queueRegistry = protocolSession.getQueueRegistry();
synchronized (queueRegistry)
{
if ((queue = queueRegistry.getQueue(body.queue)) == null)
@@ -92,22 +94,22 @@
}
else
{
- queue = createQueue(body, queueRegistry, protocolSession);
+ queue = createQueue(body, virtualHost, session);
if (queue.isDurable() && !queue.isAutoDelete())
{
- _store.createQueue(queue);
+ store.createQueue(queue);
}
queueRegistry.registerQueue(queue);
if (autoRegister)
{
- Exchange defaultExchange = protocolSession.getExchangeRegistry().getExchange("amq.direct");
+ Exchange defaultExchange = exchangeRegistry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
defaultExchange.registerQueue(body.queue, queue, null);
queue.bind(body.queue, defaultExchange);
_log.info("Queue " + body.queue + " bound to default exchange");
}
}
}
- else if(queue.getOwner() != null && !protocolSession.getContextKey().equals(queue.getOwner()))
+ else if(queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
{
// todo - constant
throw body.getChannelException(405, "Cannot declare queue, as exclusive queue with same name declared on another connection");
@@ -118,25 +120,26 @@
_log.info("Queue " + body.queue + " exists and is accesible to this connection [owner=" + queue.getOwner() +"]");
}
//set this as the default queue on the channel:
- protocolSession.getChannel(evt.getChannelId()).setDefaultQueue(queue);
+ session.getChannel(evt.getChannelId()).setDefaultQueue(queue);
}
+
if (!body.nowait)
{
// Be aware of possible changes to parameter order as versions change.
AMQMethodBody response = QueueDeclareOkBody.createMethodBody(
- protocolSession.getMajor(), // AMQP major version
- protocolSession.getMinor(), // AMQP minor version
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion(), // AMQP minor version
queue.getConsumerCount(), // consumerCount
queue.getMessageCount(), // messageCount
body.queue); // queue
_log.info("Queue " + body.queue + " declared successfully");
- protocolSession.writeResponse(evt, response);
+ session.writeResponse(evt, response);
}
}
- protected String createName()
+ protected AMQShortString createName()
{
- return "tmp_" + pad(_counter.incrementAndGet());
+ return new AMQShortString("tmp_" + pad(_counter.incrementAndGet()));
}
protected static String pad(int value)
@@ -144,13 +147,13 @@
return MessageFormat.format("{0,number,0000000000000}", value);
}
- protected AMQQueue createQueue(QueueDeclareBody body, final QueueRegistry registry, final AMQProtocolSession session)
+ protected AMQQueue createQueue(QueueDeclareBody body, VirtualHost virtualHost, final AMQProtocolSession session)
throws AMQException
{
- String owner = body.exclusive ? session.getContextKey() : null;
- if (owner != null) _log.info("Queue " + body.queue + " is owned by " + owner);
- final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, registry);
- final String queueName = queue.getName();
+ final QueueRegistry registry = virtualHost.getQueueRegistry();
+ AMQShortString owner = body.exclusive ? session.getContextKey() : null;
+ final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost);
+ final AMQShortString queueName = queue.getName();
if(body.exclusive && !body.durable)
{
@@ -177,7 +180,6 @@
session.removeSessionCloseTask(deleteQueueTask);
}
});
-
}
return queue;
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Wed Feb 14 12:02:03 2007
@@ -20,21 +20,25 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+//import org.apache.log4j.Logger;
public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
{
+ //private static final Logger _log = Logger.getLogger(QueueDeleteHandler.class);
+
private static final QueueDeleteHandler _instance = new QueueDeleteHandler();
public static QueueDeleteHandler getInstance()
@@ -43,7 +47,6 @@
}
private final boolean _failIfNotFound;
- private final MessageStore _store;
public QueueDeleteHandler()
{
@@ -53,12 +56,15 @@
public QueueDeleteHandler(boolean failIfNotFound)
{
_failIfNotFound = failIfNotFound;
- _store = ApplicationRegistry.getInstance().getMessageStore();
-
}
- public void methodReceived(AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ MessageStore store = virtualHost.getMessageStore();
+
QueueDeleteBody body = evt.getMethod();
AMQQueue queue;
if(body.queue == null)
@@ -67,7 +73,7 @@
}
else
{
- queue = session.getQueueRegistry().getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.queue);
}
if(queue == null)
@@ -92,12 +98,12 @@
else
{
int purged = queue.delete(body.ifUnused, body.ifEmpty);
- _store.removeQueue(queue.getName());
+ store.removeQueue(queue.getName());
// Be aware of possible changes to parameter order as versions change.
session.writeResponse(evt, QueueDeleteOkBody.createMethodBody(
- session.getMajor(), // AMQP major version
- session.getMinor(), // AMQP minor version
- purged)); // messageCount
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion(), // AMQP minor version
+ purged)); // messageCount
}
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java Wed Feb 14 12:02:03 2007
@@ -1,20 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.handler;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.QueuePurgeBody;
+import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+//import org.apache.log4j.Logger;
public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
{
+ //private static final Logger _log = Logger.getLogger(QueuePurgeHandler.class);
+
private static final QueuePurgeHandler _instance = new QueuePurgeHandler();
public static QueuePurgeHandler getInstance()
@@ -34,9 +61,11 @@
_failIfNotFound = failIfNotFound;
}
- public void methodReceived(AMQProtocolSession session, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
{
- QueueRegistry queueRegistry = session.getQueueRegistry();
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
QueuePurgeBody body = evt.getMethod();
AMQQueue queue;
@@ -66,15 +95,16 @@
}
else
{
- long purged = queue.clearQueue();
-
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+ long purged = queue.clearQueue(channel.getStoreContext());
if(!body.nowait)
{
- AMQMethodBody response
- = QueuePurgeOkBody.createMethodBody(session.getMajor(), session.getMinor(), purged);
+ AMQMethodBody response = QueuePurgeOkBody.createMethodBody(
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion(), // AMQP minor version
+ purged);
session.writeResponse(evt, response);
-
}
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Wed Feb 14 12:02:03 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,14 +25,16 @@
import org.apache.qpid.framing.TxCommitOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
+
public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
{
+ private static final Logger _log = Logger.getLogger(TxCommitHandler.class);
+
private static TxCommitHandler _instance = new TxCommitHandler();
public static TxCommitHandler getInstance()
@@ -44,19 +46,26 @@
{
}
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<TxCommitBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxCommitBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
- try{
- AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ try
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Commit received on channel " + evt.getChannelId());
+ }
+ AMQChannel channel = session.getChannel(evt.getChannelId());
channel.commit();
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeResponse(evt, TxCommitOkBody.createMethodBody(
- protocolSession.getMajor(), // AMQP major version
- protocolSession.getMinor())); // AMQP minor version
- channel.processReturns(protocolSession);
- }catch(AMQException e){
+ session.writeResponse(evt, TxCommitOkBody.createMethodBody(
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion())); // AMQP minor version
+ channel.processReturns(session);
+ }
+ catch(AMQException e)
+ {
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Wed Feb 14 12:02:03 2007
@@ -24,15 +24,17 @@
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxRollbackOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.AMQChannel;
+
+//import org.apache.log4j.Logger;
public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBody>
{
+ //private static final Logger _log = Logger.getLogger(TxRollbackHandler.class);
+
private static TxRollbackHandler _instance = new TxRollbackHandler();
public static TxRollbackHandler getInstance()
@@ -44,20 +46,24 @@
{
}
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<TxRollbackBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxRollbackBody> evt) throws AMQException
{
- try{
- AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ try
+ {
+ AMQChannel channel = session.getChannel(evt.getChannelId());
channel.rollback();
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeResponse(evt, TxRollbackOkBody.createMethodBody(
- protocolSession.getMajor(), // AMQP major version
- protocolSession.getMinor())); // AMQP minor version
+ session.writeResponse(evt, TxRollbackOkBody.createMethodBody(
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion())); // AMQP minor version
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
- channel.resend(protocolSession);
- }catch(AMQException e){
+ channel.resend(session, false);
+ }
+ catch(AMQException e)
+ {
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java Wed Feb 14 12:02:03 2007
@@ -24,14 +24,16 @@
import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+//import org.apache.log4j.Logger;
+
public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
{
+ //private static final Logger _log = Logger.getLogger(TxSelectHandler.class);
+
private static TxSelectHandler _instance = new TxSelectHandler();
public static TxSelectHandler getInstance()
@@ -43,13 +45,14 @@
{
}
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<TxSelectBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxSelectBody> evt) throws AMQException
{
- protocolSession.getChannel(evt.getChannelId()).setTransactional(true);
+ AMQProtocolSession session = stateManager.getProtocolSession();
+
+ session.getChannel(evt.getChannelId()).setLocalTransactional();
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeResponse(evt, TxSelectOkBody.createMethodBody(
- protocolSession.getMajor(), // AMQP major version
- protocolSession.getMinor())); // AMQP minor version
+ session.writeResponse(evt, TxSelectOkBody.createMethodBody(
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion())); // AMQP minor version
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java Wed Feb 14 12:02:03 2007
@@ -67,7 +67,7 @@
{
try
{
- ApplicationRegistry.getInstance().getManagedObjectRegistry().registerObject(this);
+ getManagedObjectRegistry().registerObject(this);
}
catch (JMException e)
{
@@ -75,11 +75,16 @@
}
}
+ protected ManagedObjectRegistry getManagedObjectRegistry()
+ {
+ return ApplicationRegistry.getInstance().getManagedObjectRegistry();
+ }
+
public void unregister() throws AMQException
{
try
{
- ApplicationRegistry.getInstance().getManagedObjectRegistry().unregisterObject(this);
+ getManagedObjectRegistry().unregisterObject(this);
}
catch (JMException e)
{
@@ -91,14 +96,14 @@
{
return getObjectInstanceName() + "[" + getType() + "]";
}
+
/**
* Created the ObjectName as per the JMX Specs
* @return ObjectName
* @throws MalformedObjectNameException
*/
- public ObjectName getObjectName()
- throws MalformedObjectNameException
+ public ObjectName getObjectName() throws MalformedObjectNameException
{
String name = getObjectInstanceName();
StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN);
@@ -113,26 +118,41 @@
return new ObjectName(objectName.toString());
}
- private String getHierarchicalType(ManagedObject obj)
+ protected ObjectName getObjectNameForSingleInstanceMBean() throws MalformedObjectNameException
+ {
+ StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN);
+
+ objectName.append(":type=");
+ objectName.append(getHierarchicalType(this));
+
+ String hierarchyName = getHierarchicalName(this);
+ if (hierarchyName != null)
+ {
+ objectName.append(",");
+ objectName.append(hierarchyName.substring(0, hierarchyName.lastIndexOf(",")));
+ }
+
+ return new ObjectName(objectName.toString());
+ }
+
+ protected String getHierarchicalType(ManagedObject obj)
{
- String parentType = null;
if (obj.getParentObject() != null)
{
- parentType = getHierarchicalType(obj.getParentObject()).toString();
+ String parentType = getHierarchicalType(obj.getParentObject()).toString();
return parentType + "." + obj.getType();
}
else
return obj.getType();
}
- private String getHierarchicalName(ManagedObject obj)
+ protected String getHierarchicalName(ManagedObject obj)
{
- String parentName = null;
if (obj.getParentObject() != null)
{
- parentName = obj.getParentObject().getType() + "=" +
- obj.getParentObject().getObjectInstanceName() + ","+
- getHierarchicalName(obj.getParentObject());
+ String parentName = obj.getParentObject().getType() + "=" +
+ obj.getParentObject().getObjectInstanceName() + ","+
+ getHierarchicalName(obj.getParentObject());
return parentName;
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java Wed Feb 14 12:02:03 2007
@@ -37,7 +37,7 @@
*/
public interface ManagedBroker
{
- static final String TYPE = "BrokerManager";
+ static final String TYPE = "VirtualHostManager";
/**
* Creates a new Exchange.
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Wed Feb 14 12:02:03 2007
@@ -23,11 +23,14 @@
import org.apache.log4j.Logger;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ConnectionStartBody;
@@ -49,14 +52,16 @@
import org.apache.qpid.framing.ResponseManager;
import org.apache.qpid.framing.RequestResponseMappingException;
import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.MainRegistry;
+import org.apache.qpid.framing.VersionSpecificRegistry;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
@@ -65,6 +70,9 @@
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import javax.management.JMException;
import javax.security.sasl.SaslServer;
@@ -76,7 +84,7 @@
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public class AMQMinaProtocolSession implements AMQProtocolSession,
ProtocolVersionList,
@@ -86,19 +94,26 @@
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
+ // to save boxing the channelId and looking up in a map... cache in an array the low numbered
+ // channels. This value must be of the form 2^x - 1.
+ private static final int CHANNEL_CACHE_SIZE = 0xff;
+
private final IoSession _minaProtocolSession;
- private String _contextKey;
+ private AMQShortString _contextKey;
+
+ private VirtualHost _virtualHost;
private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
+ private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE+1];
+
private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
private final AMQStateManager _stateManager;
- private final QueueRegistry _queueRegistry;
-
- private final ExchangeRegistry _exchangeRegistry;
+ private ExchangeRegistry _exchangeRegistry;
+ private MessageStore _messageStore;
private AMQCodecFactory _codecFactory;
@@ -118,15 +133,15 @@
private long _maxFrameSize = 65536;
/* AMQP Version for this session */
- private byte _major;
- private byte _minor;
+ private byte _major = pv[pv.length-1][PROTOCOL_MAJOR];
+ private byte _minor = pv[pv.length-1][PROTOCOL_MINOR];
private FieldTable _clientProperties;
-
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+ private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]);
// Keeps a tally of connections for logging and debugging
- private static AtomicInteger _ConnectionId;
- static { _ConnectionId = new AtomicInteger(0); }
+ private static AtomicLong _ConnectionId;
+ static { _ConnectionId = new AtomicLong(0L); }
public ManagedObject getManagedObject()
{
@@ -134,26 +149,41 @@
}
- public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
+ public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
AMQCodecFactory codecFactory)
throws AMQException
{
_ConnectionId.incrementAndGet();
- _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this);
+ _stateManager = new AMQStateManager(virtualHostRegistry, this);
_minaProtocolSession = session;
session.setAttachment(this);
- _frameListeners.add(_stateManager);
- _queueRegistry = queueRegistry;
- _exchangeRegistry = exchangeRegistry;
_codecFactory = codecFactory;
- _managedObject = createMBean();
- _managedObject.register();
+ _exchangeRegistry = null;
+ _messageStore = null;
+
_closePending = false;
_closed = false;
- createChannel(0);
+ createChannel(0); // Required to handle requests / responses for channel 0
+
+ _frameListeners.add(_stateManager);
+ _managedObject = createMBean();
+ _managedObject.register();
+
+ try
+ {
+ IoServiceConfig config = session.getServiceConfig();
+ ReadWriteThreadModel threadModel = (ReadWriteThreadModel) config.getThreadModel();
+ threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
+ threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
+ }
+ catch (RuntimeException e)
+ {
+ e.printStackTrace();
+ // throw e;
+ }
}
- public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
+ public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
AMQCodecFactory codecFactory, AMQStateManager stateManager)
throws AMQException
{
@@ -161,15 +191,14 @@
_stateManager = stateManager;
_minaProtocolSession = session;
session.setAttachment(this);
- _frameListeners.add(_stateManager);
- _queueRegistry = queueRegistry;
- _exchangeRegistry = exchangeRegistry;
+
_codecFactory = codecFactory;
- _managedObject = createMBean();
- _managedObject.register();
+ _exchangeRegistry = null;
+ _messageStore = null;
+
_closePending = false;
_closed = false;
- createChannel(0);
+ createChannel(0); // Required to handle requests / responses for channel 0
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -197,8 +226,7 @@
private AMQChannel createChannel(int id) throws AMQException
{
- IApplicationRegistry registry = ApplicationRegistry.getInstance();
- AMQChannel channel = new AMQChannel(id, this, registry.getMessageStore(),
+ AMQChannel channel = new AMQChannel(id, this, _messageStore,
_exchangeRegistry, _stateManager);
addChannel(channel);
return channel;
@@ -217,10 +245,12 @@
try
{
pi.checkVersion(this); // Fails if not correct
+
// This sets the protocol version (and hence framing classes) for this session.
- _major = pi.protocolMajor;
- _minor = pi.protocolMinor;
+ setProtocolVersion(pi.protocolMajor,pi.protocolMinor);
+
String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
+
String locales = "en_US";
// Interfacing with generated code - be aware of possible changes to parameter order as versions change.
AMQMethodBody connectionStartBody = ConnectionStartBody.createMethodBody
@@ -251,15 +281,15 @@
else if(!_closed)
{
AMQFrame frame = (AMQFrame) message;
- AMQChannel channel = getChannel(frame.channel);
+ AMQChannel channel = getChannel(frame.getChannel());
if (_closePending)
{
// If a close is pending (ie ChannelClose has been sent, but no ChannelCloseOk received), then
// all methods except ChannelCloseOk must be rejected. (AMQP spec)
- if((frame.bodyFrame instanceof AMQRequestBody))
+ if((frame.getBodyFrame() instanceof AMQRequestBody))
throw new AMQException("Incoming request frame on connection which is pending close.");
- AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
+ AMQRequestBody requestBody = (AMQRequestBody)frame.getBodyFrame();
if (!(requestBody.getMethodPayload() instanceof ConnectionCloseOkBody))
throw new AMQException("Incoming frame on closing connection is not a Connection.CloseOk method.");
}
@@ -271,9 +301,9 @@
// b. Have a request id of 1 (i.e. the first request on a new channel);
// c. Must be a ConnectionOpenBody method.
// Throw an exception for all other incoming frames on an unopened channel
- if(!(frame.bodyFrame instanceof AMQRequestBody))
+ if(!(frame.getBodyFrame() instanceof AMQRequestBody))
throw new AMQException("Incoming frame on unopened channel is not a request.");
- AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
+ AMQRequestBody requestBody = (AMQRequestBody)frame.getBodyFrame();
if (!(requestBody.getMethodPayload() instanceof ChannelOpenBody)) {
closeSessionRequest(
requestBody.getMethodPayload().getConnectionException(
@@ -283,16 +313,16 @@
}
if (requestBody.getRequestId() != 1)
throw new AMQException("Incoming Channel.Open frame on unopened channel does not have a request id = 1.");
- channel = createChannel(frame.channel);
+ channel = createChannel(frame.getChannel());
}
- if (frame.bodyFrame instanceof AMQRequestBody)
+ if (frame.getBodyFrame() instanceof AMQRequestBody)
{
- requestFrameReceived(frame.channel, (AMQRequestBody)frame.bodyFrame);
+ requestFrameReceived(frame.getChannel(), (AMQRequestBody)frame.getBodyFrame());
}
- else if (frame.bodyFrame instanceof AMQResponseBody)
+ else if (frame.getBodyFrame() instanceof AMQResponseBody)
{
- responseFrameReceived(frame.channel, (AMQResponseBody)frame.bodyFrame);
+ responseFrameReceived(frame.getChannel(), (AMQResponseBody)frame.getBodyFrame());
}
else
{
@@ -303,10 +333,11 @@
private void requestFrameReceived(int channelNum, AMQRequestBody requestBody) throws Exception
{
- try{
+ try
+ {
if (_logger.isDebugEnabled())
{
- _logger.debug("Request frame received: " + requestBody);
+ _logger.debug("Request frame received: " + requestBody);
}
AMQChannel channel = getChannel(channelNum);
ResponseManager responseManager = channel.getResponseManager();
@@ -383,12 +414,12 @@
_minaProtocolSession.write(frame);
}
- public String getContextKey()
+ public AMQShortString getContextKey()
{
return _contextKey;
}
- public void setContextKey(String contextKey)
+ public void setContextKey(AMQShortString contextKey)
{
_contextKey = contextKey;
}
@@ -400,7 +431,9 @@
public AMQChannel getChannel(int channelId)
{
- return _channelMap.get(channelId);
+ return ((channelId & CHANNEL_CACHE_SIZE) == channelId)
+ ? _cachedChannels[channelId]
+ : _channelMap.get(channelId);
}
public void addChannel(AMQChannel channel)
@@ -410,7 +443,13 @@
throw new IllegalStateException("Session is closed");
}
- _channelMap.put(channel.getChannelId(), channel);
+ final int channelId = channel.getChannelId();
+ _channelMap.put(channelId, channel);
+
+ if(((channelId & CHANNEL_CACHE_SIZE) == channelId))
+ {
+ _cachedChannels[channelId] = channel;
+ }
checkForNotification();
}
@@ -450,7 +489,7 @@
}
// Used to initiate a channel close from the server side and inform the client
- public void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException
+ public void closeChannelRequest(int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
final AMQChannel channel = _channelMap.get(channelId);
if (channel == null)
@@ -481,7 +520,7 @@
// Used to close a channel as a response to a client close request
public void closeChannelResponse(int channelId, long requestId) throws AMQException
{
- final AMQChannel channel = _channelMap.get(channelId);
+ final AMQChannel channel = getChannel(channelId);
if (channel == null)
{
throw new IllegalArgumentException("Unknown channel id");
@@ -497,13 +536,14 @@
}
finally
{
- _channelMap.remove(channelId);
+ removeChannel(channelId);
+
}
}
}
// Used to initiate a connection close from the server side and inform the client
- public void closeSessionRequest(int replyCode, String replyText, int classId, int methodId) throws AMQException
+ public void closeSessionRequest(int replyCode, AMQShortString replyText, int classId, int methodId) throws AMQException
{
_closePending = true; // This prevents all methods except Close-Ok from being accepted
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
@@ -523,7 +563,7 @@
closeSession();
}
- public void closeSessionRequest(int replyCode, String replyText) throws AMQException
+ public void closeSessionRequest(int replyCode, AMQShortString replyText) throws AMQException
{
closeSessionRequest(replyCode, replyText, 0, 0);
}
@@ -531,7 +571,7 @@
public void closeSessionRequest(AMQConnectionException e) throws AMQException
{
- closeSessionRequest(e.getErrorCode(), e.getMessage(), e.getClassId(), e.getMethodId());
+ closeSessionRequest(e.getErrorCode(), new AMQShortString(e.getMessage()), e.getClassId(), e.getMethodId());
}
@@ -569,6 +609,10 @@
public void removeChannel(int channelId)
{
_channelMap.remove(channelId);
+ if((channelId & CHANNEL_CACHE_SIZE) == channelId)
+ {
+ _cachedChannels[channelId] = null;
+ }
}
/**
@@ -614,6 +658,10 @@
channel.close(this);
}
_channelMap.clear();
+ for(int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
+ {
+ _cachedChannels[i]=null;
+ }
}
public String toString()
@@ -679,52 +727,46 @@
_clientProperties = clientProperties;
if((_clientProperties != null) && (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null))
{
- setContextKey(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE));
+ setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
}
}
- public QueueRegistry getQueueRegistry()
- {
- return _queueRegistry;
- }
-
- public ExchangeRegistry getExchangeRegistry()
- {
- return _exchangeRegistry;
- }
-
public AMQStateManager getStateManager()
{
return _stateManager;
}
- /**
- * Convenience methods for managing AMQP version.
- * NOTE: Both major and minor will be set to 0 prior to protocol initiation.
- */
+ private void setProtocolVersion(byte major, byte minor)
+ {
+ _major = major;
+ _minor = minor;
+ _registry = MainRegistry.getVersionSpecificRegistry(major,minor);
+ }
- public byte getMajor()
+ public byte getProtocolMajorVersion()
{
return _major;
}
- public byte getMinor()
+ public byte getProtocolMinorVersion()
{
return _minor;
}
- public boolean versionEquals(byte major, byte minor)
+ public boolean isProtocolVersionEqual(byte major, byte minor)
{
return _major == major && _minor == minor;
}
- public void checkMethodBodyVersion(AMQMethodBody methodBody) {
- if (!versionEquals(methodBody.getMajor(), methodBody.getMinor())) {
+ public void checkMethodBodyVersion(AMQMethodBody methodBody)
+ {
+ if (!isProtocolVersionEqual(methodBody.getMajor(), methodBody.getMinor()))
+ {
throw new RuntimeException("MethodBody version did not match version of current session.");
}
}
- public int getConnectionId()
+ public long getConnectionId()
{
return _ConnectionId.get();
}
@@ -742,5 +784,25 @@
public void removeSessionCloseTask(Task task)
{
_taskList.remove(task);
+ }
+
+ public VersionSpecificRegistry getRegistry()
+ {
+ return _registry;
+ }
+
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public void setVirtualHost(VirtualHost virtualHost) throws AMQException
+ {
+ _virtualHost = virtualHost;
+ _exchangeRegistry = virtualHost.getExchangeRegistry();
+ _messageStore = virtualHost.getMessageStore();
+ _managedObject = createMBean();
+ _managedObject.register();
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Wed Feb 14 12:02:03 2007
@@ -54,41 +54,26 @@
{
private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class);
- /**
- * The registry of all queues. This is passed to frame listeners when frame
- * events occur.
- */
- private final QueueRegistry _queueRegistry;
+ private final IApplicationRegistry _applicationRegistry;
- /**
- * The registry of all exchanges. This is passed to frame listeners when frame
- * events occur.
- */
- private final ExchangeRegistry _exchangeRegistry;
private boolean _useSSL;
public AMQPFastProtocolHandler(Integer applicationRegistryInstance)
{
- IApplicationRegistry registry = ApplicationRegistry.getInstance(applicationRegistryInstance);
-
- _queueRegistry = registry.getQueueRegistry();
- _exchangeRegistry = registry.getExchangeRegistry();
- _logger.debug("AMQPFastProtocolHandler created");
+ this(ApplicationRegistry.getInstance(applicationRegistryInstance));
}
- public AMQPFastProtocolHandler(QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry)
+ public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry)
{
- _queueRegistry = queueRegistry;
- _exchangeRegistry = exchangeRegistry;
+ _applicationRegistry = applicationRegistry;
_logger.debug("AMQPFastProtocolHandler created");
}
protected AMQPFastProtocolHandler(AMQPFastProtocolHandler handler)
{
- this(handler._queueRegistry, handler._exchangeRegistry);
+ this(handler._applicationRegistry);
}
public void sessionCreated(IoSession protocolSession) throws Exception
@@ -96,7 +81,7 @@
SessionUtil.initialize(protocolSession);
final AMQCodecFactory codecFactory = new AMQCodecFactory(true);
- createSession(protocolSession, _queueRegistry, _exchangeRegistry, codecFactory);
+ createSession(protocolSession, _applicationRegistry, codecFactory);
_logger.info("Protocol session created");
final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory);
@@ -121,9 +106,9 @@
/**
* Separated into its own, protected, method to allow easier reuse
*/
- protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException
+ protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException
{
- new AMQMinaProtocolSession(session, queues, exchanges, codec);
+ new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec);
}
public void sessionOpened(IoSession protocolSession) throws Exception
@@ -135,7 +120,11 @@
{
_logger.info("Protocol Session closed");
final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
- amqProtocolSession.closeSession();
+ //fixme -- this can be null
+ if(amqProtocolSession != null)
+ {
+ amqProtocolSession.closeSession();
+ }
}
public void sessionIdle(IoSession session, IdleStatus status) throws Exception
@@ -154,8 +143,7 @@
}
- public void exceptionCaught(IoSession protocolSession, AMQMethodListener methodListener,
- Throwable throwable) throws Exception
+ public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception
{
AMQProtocolSession session = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
if (throwable instanceof AMQProtocolHeaderException)
@@ -176,7 +164,7 @@
{
_logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
// TODO: Closing with code 200 ("reply-sucess") ??? This cannot be right!
- session.closeSessionRequest(200, throwable.getMessage());
+ session.closeSessionRequest(200, new AMQShortString(throwable.getMessage()));
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java Wed Feb 14 12:02:03 2007
@@ -42,8 +42,7 @@
public AMQPProtocolProvider()
{
IApplicationRegistry registry = ApplicationRegistry.getInstance();
- _handler = new AMQPFastProtocolHandler(registry.getQueueRegistry(),
- registry.getExchangeRegistry());
+ _handler = new AMQPFastProtocolHandler(registry);
}
public AMQPFastProtocolHandler getHandler()