You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/02/14 21:02:22 UTC

svn commit: r507672 [5/16] - in /incubator/qpid/branches/qpid.0-9: gentools/src/org/apache/qpid/gentools/ gentools/templ.java/ gentools/templ.net/ java/ java/broker/ java/broker/bin/ java/broker/distribution/ java/broker/distribution/src/ java/broker/d...

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java Wed Feb 14 12:02:03 2007
@@ -23,14 +23,16 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.MessageEmptyBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
+//import org.apache.log4j.Logger;
+
 public class MessageEmptyHandler implements StateAwareMethodListener<MessageEmptyBody>
 {
+    //private static final Logger _logger = Logger.getLogger(MessageEmptyHandler.class);
+
     private static MessageEmptyHandler _instance = new MessageEmptyHandler();
 
     public static MessageEmptyHandler getInstance()
@@ -39,11 +41,8 @@
     }
 
     private MessageEmptyHandler() {}
-    
-    
-    public void methodReceived (AMQProtocolSession protocolSession,
-                               	AMQMethodEvent<MessageEmptyBody> evt)
-                                throws AMQException
+      
+    public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageEmptyBody> evt) throws AMQException
     {
 		// TODO
     }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java Wed Feb 14 12:02:03 2007
@@ -21,16 +21,28 @@
 package org.apache.qpid.server.handler;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.framing.MessageEmptyBody;
+import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ConsumerTagNotUniqueException;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import org.apache.log4j.Logger;
 
 public class MessageGetHandler implements StateAwareMethodListener<MessageGetBody>
 {
+    private static final Logger _logger = Logger.getLogger(MessageGetHandler.class);
+
     private static MessageGetHandler _instance = new MessageGetHandler();
 
     public static MessageGetHandler getInstance()
@@ -40,11 +52,85 @@
 
     private MessageGetHandler() {}
     
-    public void methodReceived (AMQProtocolSession protocolSession,
-                               	AMQMethodEvent<MessageGetBody> evt)
-                                throws AMQException
+    public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageGetBody> evt) throws AMQException
     {
-		// TODO
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        final MessageGetBody body = evt.getMethod();
+        final int channelId = evt.getChannelId();
+        VirtualHost virtualHost = session.getVirtualHost();
+        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+        
+        AMQChannel channel = session.getChannel(channelId);
+        if (channel == null)
+        {
+            _logger.error("Channel " + channelId + " not found");
+            // TODO: either alert or error that the
+        }
+        else
+        {
+            AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
+
+            if (queue == null)
+            {
+                _logger.info("No queue for '" + body.queue + "'");
+                if(body.queue != null)
+                {
+                    session.closeChannelRequest(evt.getChannelId(), AMQConstant.NOT_FOUND.getCode(),
+                        new AMQShortString("No such queue, '" + body.queue + "'"));
+                }
+                else
+                {
+                    session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
+                        new AMQShortString("No queue name provided, no default queue defined."),
+                        body.getClazz(), body.getMethod());
+                }
+            }
+            else
+            {
+                try
+                {
+                    if(queue.performGet(session, channel, !body.noAck))
+                    {
+                        session.writeResponse(evt, MessageOkBody.createMethodBody(
+                            session.getProtocolMajorVersion(), // AMQP major version
+                            session.getProtocolMinorVersion())); // AMQP minor version
+                    }
+                    else
+                    {
+                        session.writeResponse(evt, MessageEmptyBody.createMethodBody(
+                            session.getProtocolMajorVersion(), // AMQP major version
+                            session.getProtocolMinorVersion())); // AMQP minor version
+                    }
+                }
+                catch (AMQInvalidSelectorException ise)
+                {
+                    _logger.info("Closing connection due to invalid selector: " + ise.getMessage());
+                    session.closeChannelRequest(evt.getChannelId(), AMQConstant.INVALID_SELECTOR.getCode(),
+                        new AMQShortString(ise.getMessage()));
+                }
+//                 catch (ConsumerTagNotUniqueException e)
+//                 {
+//                     _logger.info("Closing connection due to duplicate (non-unique) consumer tag: " + e.getMessage());
+//                     session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
+//                         new AMQShortString("Non-unique consumer tag, '" + body.destination + "'"),
+//                         body.getClazz(), body.getMethod());
+//                 }
+                catch (AMQQueue.ExistingExclusiveSubscription e)
+                {
+                    throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+                                                  "Cannot subscribe to queue "
+                                                          + queue.getName()
+                                                          + " as it already has an existing exclusive consumer");
+                }
+                catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+                {
+                    throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+                                                   "Cannot subscribe to queue "
+                                                   + queue.getName()
+                                                   + " exclusively as it already has a consumer");
+                }
+            }
+        }
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java Wed Feb 14 12:02:03 2007
@@ -23,14 +23,16 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.MessageOffsetBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
+//import org.apache.log4j.Logger;
+
 public class MessageOffsetHandler implements StateAwareMethodListener<MessageOffsetBody>
 {
+    //private static final Logger _logger = Logger.getLogger(MessageOffsetHandler.class);
+
     private static MessageOffsetHandler _instance = new MessageOffsetHandler();
 
     public static MessageOffsetHandler getInstance()
@@ -40,10 +42,7 @@
 
     private MessageOffsetHandler() {}
     
-    
-    public void methodReceived (AMQProtocolSession protocolSession,
-                               	AMQMethodEvent<MessageOffsetBody> evt)
-                                throws AMQException
+    public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageOffsetBody> evt) throws AMQException
     {
 		// TODO
     }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java Wed Feb 14 12:02:03 2007
@@ -23,14 +23,16 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.MessageOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
+//import org.apache.log4j.Logger;
+
 public class MessageOkHandler implements StateAwareMethodListener<MessageOkBody>
 {
+    //private static final Logger _logger = Logger.getLogger(MessageOkHandler.class);
+
     private static MessageOkHandler _instance = new MessageOkHandler();
 
     public static MessageOkHandler getInstance()
@@ -40,10 +42,7 @@
 
     private MessageOkHandler() {}
     
-    
-    public void methodReceived (AMQProtocolSession protocolSession,
-                               	AMQMethodEvent<MessageOkBody> evt)
-                                throws AMQException
+    public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageOkBody> evt) throws AMQException
     {
 		// TODO
     }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java Wed Feb 14 12:02:03 2007
@@ -22,15 +22,19 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
+//import org.apache.log4j.Logger;
+
 public class MessageOpenHandler implements StateAwareMethodListener<MessageOpenBody>
 {
+    //private static final Logger _logger = Logger.getLogger(MessageOpenHandler.class);
+
     private static MessageOpenHandler _instance = new MessageOpenHandler();
 
     public static MessageOpenHandler getInstance()
@@ -39,13 +43,15 @@
     }
 
     private MessageOpenHandler() {}
-    
-    
-    public void methodReceived (AMQProtocolSession protocolSession,
-                               	AMQMethodEvent<MessageOpenBody> evt)
-                                throws AMQException
+        
+    public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageOpenBody> evt) throws AMQException
     {
-		// TODO
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        AMQChannel channel = session.getChannel(evt.getChannelId());
+        channel.addMessageOpen(evt.getMethod());
+        session.writeResponse(evt, MessageOkBody.createMethodBody(
+            session.getProtocolMajorVersion(), // AMQP major version
+            session.getProtocolMinorVersion())); // AMQP minor version
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java Wed Feb 14 12:02:03 2007
@@ -24,14 +24,16 @@
 import org.apache.qpid.framing.MessageQosBody;
 import org.apache.qpid.framing.MessageOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
+//import org.apache.log4j.Logger;
+
 public class MessageQosHandler implements StateAwareMethodListener<MessageQosBody>
 {
+    //private static final Logger _logger = Logger.getLogger(MessageQosHandler.class);
+
     private static MessageQosHandler _instance = new MessageQosHandler();
 
     public static MessageQosHandler getInstance()
@@ -41,16 +43,14 @@
 
     private MessageQosHandler() {}
     
-    
-    public void methodReceived (AMQProtocolSession protocolSession,
-                               	AMQMethodEvent<MessageQosBody> evt)
-                                throws AMQException
+    public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageQosBody> evt) throws AMQException
     {
-        protocolSession.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
         // Be aware of possible changes to parameter order as versions change.
-        protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), new MessageOkBody(
-            protocolSession.getMajor(), // AMQP major version
-            protocolSession.getMinor())); // AMQP minor version
+        session.writeResponse(evt.getChannelId(), evt.getRequestId(), MessageOkBody.createMethodBody(
+            session.getProtocolMajorVersion(), // AMQP major version
+            session.getProtocolMinorVersion())); // AMQP minor version
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java Wed Feb 14 12:02:03 2007
@@ -20,19 +20,17 @@
  */
 package org.apache.qpid.server.handler;
 
-import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.MessageRecoverBody;
 import org.apache.qpid.framing.MessageOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
+import org.apache.log4j.Logger;
+
 public class MessageRecoverHandler implements StateAwareMethodListener<MessageRecoverBody>
 {
     private static final Logger _logger = Logger.getLogger(MessageRecoverHandler.class);
@@ -46,24 +44,28 @@
 
     private MessageRecoverHandler() {}
 
-    public void methodReceived (AMQProtocolSession protocolSession,
-                               	AMQMethodEvent<MessageRecoverBody> evt)
-                                throws AMQException
+    public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageRecoverBody> evt) throws AMQException
     {
-        _logger.debug("Recover received on protocol session " + protocolSession + " and channel " + evt.getChannelId());
-        AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        AMQChannel channel = session.getChannel(evt.getChannelId());
+        _logger.debug("Recover received on protocol session " + session + " and channel " + evt.getChannelId());
         if (channel == null)
         {
             throw new AMQException("Unknown channel " + evt.getChannelId());
         }
         MessageRecoverBody body = evt.getMethod();
-        if (body.requeue) {
+        if (body.requeue)
+        {
             channel.requeue();
-        } else {
-            channel.resend(protocolSession);
         }
-        MessageOkBody response = MessageOkBody.createMethodBody(protocolSession.getMajor(), protocolSession.getMinor());
-        protocolSession.writeResponse(evt, response);
+        else
+        {
+            channel.resend(session, false);
+        }
+        MessageOkBody response = MessageOkBody.createMethodBody(
+            session.getProtocolMajorVersion(), // AMQP major version
+            session.getProtocolMinorVersion()); // AMQP minor version
+        session.writeResponse(evt, response);
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java Wed Feb 14 12:02:03 2007
@@ -23,14 +23,16 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.MessageRejectBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
+//import org.apache.log4j.Logger;
+
 public class MessageRejectHandler implements StateAwareMethodListener<MessageRejectBody>
 {
+    //private static final Logger _logger = Logger.getLogger(MessageRejectHandler.class);
+
     private static MessageRejectHandler _instance = new MessageRejectHandler();
 
     public static MessageRejectHandler getInstance()
@@ -39,11 +41,8 @@
     }
 
     private MessageRejectHandler() {}
-    
-    
-    public void methodReceived (AMQProtocolSession protocolSession,
-                               	AMQMethodEvent<MessageRejectBody> evt)
-                                throws AMQException
+     
+    public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageRejectBody> evt) throws AMQException
     {
 		// TODO
     }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java Wed Feb 14 12:02:03 2007
@@ -23,14 +23,16 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.MessageResumeBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
+//import org.apache.log4j.Logger;
+
 public class MessageResumeHandler implements StateAwareMethodListener<MessageResumeBody>
 {
+    //private static final Logger _logger = Logger.getLogger(MessageResumeHandler.class);
+
     private static MessageResumeHandler _instance = new MessageResumeHandler();
 
     public static MessageResumeHandler getInstance()
@@ -40,10 +42,7 @@
 
     private MessageResumeHandler() {}
     
-    
-    public void methodReceived (AMQProtocolSession protocolSession,
-                               	AMQMethodEvent<MessageResumeBody> evt)
-                                throws AMQException
+    public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageResumeBody> evt) throws AMQException
     {
 		// TODO
     }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java Wed Feb 14 12:02:03 2007
@@ -20,21 +20,21 @@
  */
 package org.apache.qpid.server.handler;
 
-import org.apache.qpid.framing.MessageOkBody;
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.MessageOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import org.apache.log4j.Logger;
 
 public class MessageTransferHandler implements StateAwareMethodListener<MessageTransferBody>
 {
@@ -47,50 +47,41 @@
         return _instance;
     }
 
-    private static final String UNKNOWN_EXCHANGE_NAME = "Unknown exchange name";
-
     private MessageTransferHandler() {}
 
-    public void methodReceived (AMQProtocolSession protocolSession,
-                               	AMQMethodEvent<MessageTransferBody> evt)
-                                throws AMQException
+    public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageTransferBody> evt) throws AMQException
     {
+        AMQProtocolSession session = stateManager.getProtocolSession();
         final MessageTransferBody body = evt.getMethod();
+        VirtualHost virtualHost = session.getVirtualHost();
+        ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
 
-        if (_log.isDebugEnabled()) {
+        if (_log.isDebugEnabled())
+        {
             _log.debug("Publish received on channel " + evt.getChannelId());
         }
 
         // TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
-        if (body.destination == null) {
+        if (body.destination == null)
+        {
             body.destination = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
         }
-        Exchange e = protocolSession.getExchangeRegistry().getExchange(body.destination);
+        Exchange e = exchangeRegistry.getExchange(body.destination);
         // if the exchange does not exist we raise a channel exception
-        if (e == null) {
-//             protocolSession.closeChannel(evt.getChannelId());
-//             // TODO: modify code gen to make getClazz and getMethod public methods rather than protected
-//             // then we can remove the hardcoded 0,0
-//             // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-//             // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-//             // Be aware of possible changes to parameter order as versions change.
-//             AMQMethodBody cf = ChannelCloseBody.createMethodBody
-//                 ((byte)0, (byte)9,	// AMQP version (major, minor)
-//                  MessageTransferBody.getClazz((byte)0, (byte)9),	// classId
-//                  MessageTransferBody.getMethod((byte)0, (byte)9),	// methodId
-//                  500,	// replyCode
-//                  UNKNOWN_EXCHANGE_NAME);	// replyText
-//             protocolSession.writeRequest(evt.getChannelId(), cf, stateManager);
-            protocolSession.closeChannelRequest(evt.getChannelId(), 500, UNKNOWN_EXCHANGE_NAME);
-        } else {
+        if (e == null)
+        {
+            session.closeChannelRequest(evt.getChannelId(), 500, new AMQShortString("Unknown exchange name"));
+        }
+        else
+        {
             // The partially populated BasicDeliver frame plus the received route body
             // is stored in the channel. Once the final body frame has been received
             // it is routed to the exchange.
-            AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
-            channel.addMessageTransfer(body, protocolSession);
-            protocolSession.writeResponse(evt, MessageOkBody.createMethodBody(
-                protocolSession.getMajor(), // AMQP major version
-                protocolSession.getMinor())); // AMQP minor version
+            AMQChannel channel = session.getChannel(evt.getChannelId());
+            channel.addMessageTransfer(body, session);
+            session.writeResponse(evt, MessageOkBody.createMethodBody(
+                session.getProtocolMajorVersion(), // AMQP major version
+                session.getProtocolMinorVersion())); // AMQP minor version
         }
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Wed Feb 14 12:02:03 2007
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.handler;
 
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.QueueBindBody;
@@ -34,6 +33,9 @@
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import org.apache.log4j.Logger;
 
 public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
 {
@@ -46,18 +48,21 @@
         return _instance;
     }
 
-    private QueueBindHandler()
-    {
-    }
+    private QueueBindHandler() {}
 
-    public void methodReceived(AMQProtocolSession protocolSession,
-                               AMQMethodEvent<QueueBindBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueBindBody> evt) throws AMQException
     {
+        AMQProtocolSession session = stateManager.getProtocolSession();
         final QueueBindBody body = evt.getMethod();
+        VirtualHost virtualHost = session.getVirtualHost();
+        ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+        
+
         final AMQQueue queue;
         if (body.queue == null)
         {
-            queue = protocolSession.getChannel(evt.getChannelId()).getDefaultQueue();
+            queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
             if (queue == null)
             {
                 throw new AMQException("No default queue defined on channel and queue was null");
@@ -69,14 +74,14 @@
         }
         else
         {
-            queue = protocolSession.getQueueRegistry().getQueue(body.queue);
+            queue = queueRegistry.getQueue(body.queue);
         }
 
         if (queue == null)
         {
             throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Queue " + body.queue + " does not exist.");
         }
-        final Exchange exch = protocolSession.getExchangeRegistry().getExchange(body.exchange);
+        final Exchange exch = exchangeRegistry.getExchange(body.exchange);
         if (exch == null)
         {
             throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Exchange " + body.exchange + " does not exist.");
@@ -91,9 +96,9 @@
         {
             // Be aware of possible changes to parameter order as versions change.
             final AMQMethodBody response = QueueBindOkBody.createMethodBody(
-                protocolSession.getMajor(), // AMQP major version
-                protocolSession.getMinor()); // AMQP minor version
-            protocolSession.writeResponse(evt, response);
+                session.getProtocolMajorVersion(), // AMQP major version
+                session.getProtocolMinorVersion()); // AMQP minor version
+            session.writeResponse(evt, response);
         }
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Wed Feb 14 12:02:03 2007
@@ -20,24 +20,27 @@
  */
 package org.apache.qpid.server.handler;
 
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.configuration.Configured;
+import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.QueueDeclareBody;
 import org.apache.qpid.framing.QueueDeclareOkBody;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.configuration.Configurator;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.configuration.Configurator;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import org.apache.log4j.Logger;
 
 import java.text.MessageFormat;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -58,28 +61,27 @@
 
     private final AtomicInteger _counter = new AtomicInteger();
 
-    private final MessageStore _store;
-
     protected QueueDeclareHandler()
     {
         Configurator.configure(this);
-        _store = ApplicationRegistry.getInstance().getMessageStore();
     }
 
-    public void methodReceived(AMQProtocolSession protocolSession,
-                               AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
     {
-        QueueDeclareBody body = evt.getMethod();
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        final QueueDeclareBody body = evt.getMethod();
+        VirtualHost virtualHost = session.getVirtualHost();
+        ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+        MessageStore store = virtualHost.getMessageStore();
 
         // if we aren't given a queue name, we create one which we return to the client
         if (body.queue == null)
         {
             body.queue = createName();
         }
-        //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
 
         AMQQueue queue = null;
-        QueueRegistry queueRegistry = protocolSession.getQueueRegistry();
         synchronized (queueRegistry)
         {
             if ((queue = queueRegistry.getQueue(body.queue)) == null)
@@ -92,22 +94,22 @@
                 }
                 else
                 {
-                    queue = createQueue(body, queueRegistry, protocolSession);
+                    queue = createQueue(body, virtualHost, session);
                     if (queue.isDurable() && !queue.isAutoDelete())
                     {
-                        _store.createQueue(queue);
+                        store.createQueue(queue);
                     }
                     queueRegistry.registerQueue(queue);
                     if (autoRegister)
                     {
-                        Exchange defaultExchange = protocolSession.getExchangeRegistry().getExchange("amq.direct");
+                        Exchange defaultExchange = exchangeRegistry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
                         defaultExchange.registerQueue(body.queue, queue, null);
                         queue.bind(body.queue, defaultExchange);
                         _log.info("Queue " + body.queue + " bound to default exchange");
                     }
                 }
             }
-            else if(queue.getOwner() != null && !protocolSession.getContextKey().equals(queue.getOwner()))
+            else if(queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
             {
                 // todo - constant
                 throw body.getChannelException(405, "Cannot declare queue, as exclusive queue with same name declared on another connection");        
@@ -118,25 +120,26 @@
                 _log.info("Queue " + body.queue + " exists and is accesible to this connection [owner=" + queue.getOwner() +"]");
             }
             //set this as the default queue on the channel:
-            protocolSession.getChannel(evt.getChannelId()).setDefaultQueue(queue);
+            session.getChannel(evt.getChannelId()).setDefaultQueue(queue);
         }
+
         if (!body.nowait)
         {
             // Be aware of possible changes to parameter order as versions change.
             AMQMethodBody response = QueueDeclareOkBody.createMethodBody(
-                protocolSession.getMajor(), // AMQP major version
-                protocolSession.getMinor(), // AMQP minor version
+                session.getProtocolMajorVersion(), // AMQP major version
+                session.getProtocolMinorVersion(), // AMQP minor version
                 queue.getConsumerCount(), // consumerCount
                 queue.getMessageCount(), // messageCount
                 body.queue); // queue
             _log.info("Queue " + body.queue + " declared successfully");
-            protocolSession.writeResponse(evt, response);
+            session.writeResponse(evt, response);
         }
     }
 
-    protected String createName()
+    protected AMQShortString createName()
     {
-        return "tmp_" + pad(_counter.incrementAndGet());
+        return new AMQShortString("tmp_" + pad(_counter.incrementAndGet()));
     }
 
     protected static String pad(int value)
@@ -144,13 +147,13 @@
         return MessageFormat.format("{0,number,0000000000000}", value);
     }
 
-    protected AMQQueue createQueue(QueueDeclareBody body, final QueueRegistry registry, final AMQProtocolSession session)
+    protected AMQQueue createQueue(QueueDeclareBody body, VirtualHost virtualHost, final AMQProtocolSession session)
             throws AMQException
     {
-        String owner = body.exclusive ? session.getContextKey() : null;
-        if (owner != null) _log.info("Queue " + body.queue + " is owned by " + owner);
-        final AMQQueue queue =  new AMQQueue(body.queue, body.durable, owner, body.autoDelete, registry);
-        final String queueName = queue.getName();
+        final QueueRegistry registry = virtualHost.getQueueRegistry();
+        AMQShortString owner = body.exclusive ? session.getContextKey() : null;
+        final AMQQueue queue =  new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost);
+        final AMQShortString queueName = queue.getName();
 
         if(body.exclusive && !body.durable)
         {
@@ -177,7 +180,6 @@
                     session.removeSessionCloseTask(deleteQueueTask);
                 }
             });
-
 
         }
         return queue;

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Wed Feb 14 12:02:03 2007
@@ -20,21 +20,25 @@
  */
 package org.apache.qpid.server.handler;
 
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+//import org.apache.log4j.Logger;
 
 public class QueueDeleteHandler  implements StateAwareMethodListener<QueueDeleteBody>
 {
+    //private static final Logger _log = Logger.getLogger(QueueDeleteHandler.class);
+    
     private static final QueueDeleteHandler _instance = new QueueDeleteHandler();
 
     public static QueueDeleteHandler getInstance()
@@ -43,7 +47,6 @@
     }
 
     private final boolean _failIfNotFound;
-    private final MessageStore _store;
 
     public QueueDeleteHandler()
     {
@@ -53,12 +56,15 @@
     public QueueDeleteHandler(boolean failIfNotFound)
     {
         _failIfNotFound = failIfNotFound;
-        _store = ApplicationRegistry.getInstance().getMessageStore();
-
     }
 
-    public void methodReceived(AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager,  AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
     {
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        VirtualHost virtualHost = session.getVirtualHost();
+        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+        MessageStore store = virtualHost.getMessageStore();
+
         QueueDeleteBody body = evt.getMethod();
         AMQQueue queue;
         if(body.queue == null)
@@ -67,7 +73,7 @@
         }
         else
         {
-            queue = session.getQueueRegistry().getQueue(body.queue);
+            queue = queueRegistry.getQueue(body.queue);
         }
 
         if(queue == null)
@@ -92,12 +98,12 @@
             else
             {
                 int purged = queue.delete(body.ifUnused, body.ifEmpty);
-                _store.removeQueue(queue.getName());
+                store.removeQueue(queue.getName());
                 // Be aware of possible changes to parameter order as versions change.
                 session.writeResponse(evt, QueueDeleteOkBody.createMethodBody(
-                                                session.getMajor(), // AMQP major version
-                                                session.getMinor(), // AMQP minor version
-                                                purged));	// messageCount
+                    session.getProtocolMajorVersion(), // AMQP major version
+                    session.getProtocolMinorVersion(), // AMQP minor version
+                    purged));	// messageCount
             }
         }
     }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java Wed Feb 14 12:02:03 2007
@@ -1,20 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.handler;
 
-import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.QueuePurgeBody;
+import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+//import org.apache.log4j.Logger;
 
 public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
 {
+    //private static final Logger _log = Logger.getLogger(QueuePurgeHandler.class);
+    
     private static final QueuePurgeHandler _instance = new QueuePurgeHandler();
 
     public static QueuePurgeHandler getInstance()
@@ -34,9 +61,11 @@
         _failIfNotFound = failIfNotFound;
     }
 
-    public void methodReceived(AMQProtocolSession session, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
     {
-        QueueRegistry queueRegistry = session.getQueueRegistry();
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        VirtualHost virtualHost = session.getVirtualHost();
+        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
 
         QueuePurgeBody body = evt.getMethod();
         AMQQueue queue;
@@ -66,15 +95,16 @@
         }
         else
         {
-            long purged = queue.clearQueue();
-            
+            AMQChannel channel = session.getChannel(evt.getChannelId());
+            long purged = queue.clearQueue(channel.getStoreContext());
             
             if(!body.nowait)
             {
-                AMQMethodBody response 
-                    = QueuePurgeOkBody.createMethodBody(session.getMajor(), session.getMinor(), purged);
+                AMQMethodBody response = QueuePurgeOkBody.createMethodBody(
+                    session.getProtocolMajorVersion(), // AMQP major version
+                    session.getProtocolMinorVersion(), // AMQP minor version
+                    purged);
                 session.writeResponse(evt, response);
-
             }
         }
     }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Wed Feb 14 12:02:03 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,14 +25,16 @@
 import org.apache.qpid.framing.TxCommitOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
+import org.apache.log4j.Logger;
+
 public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
 {
+    private static final Logger _log = Logger.getLogger(TxCommitHandler.class);
+
     private static TxCommitHandler _instance = new TxCommitHandler();
 
     public static TxCommitHandler getInstance()
@@ -44,19 +46,26 @@
     {
     }
 
-    public void methodReceived(AMQProtocolSession protocolSession,
-                               AMQMethodEvent<TxCommitBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxCommitBody> evt) throws AMQException
     {
+        AMQProtocolSession session = stateManager.getProtocolSession();        
 
-        try{
-            AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+        try
+        {
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Commit received on channel " + evt.getChannelId());
+            }
+            AMQChannel channel = session.getChannel(evt.getChannelId());
             channel.commit();
             // Be aware of possible changes to parameter order as versions change.
-            protocolSession.writeResponse(evt, TxCommitOkBody.createMethodBody(
-                protocolSession.getMajor(), // AMQP major version
-                protocolSession.getMinor())); // AMQP minor version
-            channel.processReturns(protocolSession);
-        }catch(AMQException e){
+            session.writeResponse(evt, TxCommitOkBody.createMethodBody(
+                session.getProtocolMajorVersion(), // AMQP major version
+                session.getProtocolMinorVersion())); // AMQP minor version
+            channel.processReturns(session);
+        }
+        catch(AMQException e)
+        {
             throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
         }
     }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Wed Feb 14 12:02:03 2007
@@ -24,15 +24,17 @@
 import org.apache.qpid.framing.TxRollbackBody;
 import org.apache.qpid.framing.TxRollbackOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.AMQChannel;
+
+//import org.apache.log4j.Logger;
 
 public class TxRollbackHandler implements StateAwareMethodListener<TxRollbackBody>
 {
+    //private static final Logger _log = Logger.getLogger(TxRollbackHandler.class);
+    
     private static TxRollbackHandler _instance = new TxRollbackHandler();
 
     public static TxRollbackHandler getInstance()
@@ -44,20 +46,24 @@
     {
     }
 
-    public void methodReceived(AMQProtocolSession protocolSession,
-                               AMQMethodEvent<TxRollbackBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxRollbackBody> evt) throws AMQException
     {
-        try{
-            AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        
+        try
+        {
+            AMQChannel channel = session.getChannel(evt.getChannelId());
             channel.rollback();
             // Be aware of possible changes to parameter order as versions change.
-            protocolSession.writeResponse(evt, TxRollbackOkBody.createMethodBody(
-                protocolSession.getMajor(), // AMQP major version
-                protocolSession.getMinor())); // AMQP minor version
+            session.writeResponse(evt, TxRollbackOkBody.createMethodBody(
+                session.getProtocolMajorVersion(), // AMQP major version
+                session.getProtocolMinorVersion())); // AMQP minor version
             //Now resend all the unacknowledged messages back to the original subscribers.
             //(Must be done after the TxnRollback-ok response).
-            channel.resend(protocolSession);
-        }catch(AMQException e){
+            channel.resend(session, false);
+        }
+        catch(AMQException e)
+        {
             throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
         }
     }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java Wed Feb 14 12:02:03 2007
@@ -24,14 +24,16 @@
 import org.apache.qpid.framing.TxSelectBody;
 import org.apache.qpid.framing.TxSelectOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
+//import org.apache.log4j.Logger;
+
 public class TxSelectHandler implements StateAwareMethodListener<TxSelectBody>
 {
+    //private static final Logger _log = Logger.getLogger(TxSelectHandler.class);
+    
     private static TxSelectHandler _instance = new TxSelectHandler();
 
     public static TxSelectHandler getInstance()
@@ -43,13 +45,14 @@
     {
     }
 
-    public void methodReceived(AMQProtocolSession protocolSession,
-                               AMQMethodEvent<TxSelectBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxSelectBody> evt) throws AMQException
     {
-        protocolSession.getChannel(evt.getChannelId()).setTransactional(true);
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        
+        session.getChannel(evt.getChannelId()).setLocalTransactional();
         // Be aware of possible changes to parameter order as versions change.
-        protocolSession.writeResponse(evt, TxSelectOkBody.createMethodBody(
-                protocolSession.getMajor(), // AMQP major version
-                protocolSession.getMinor())); // AMQP minor version
+        session.writeResponse(evt, TxSelectOkBody.createMethodBody(
+            session.getProtocolMajorVersion(), // AMQP major version
+            session.getProtocolMinorVersion())); // AMQP minor version
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java Wed Feb 14 12:02:03 2007
@@ -67,7 +67,7 @@
     {
         try
         {
-            ApplicationRegistry.getInstance().getManagedObjectRegistry().registerObject(this);
+            getManagedObjectRegistry().registerObject(this);
         }
         catch (JMException e)
         {
@@ -75,11 +75,16 @@
         }
     }
 
+    protected ManagedObjectRegistry getManagedObjectRegistry()
+    {
+        return ApplicationRegistry.getInstance().getManagedObjectRegistry();
+    }
+
     public void unregister() throws AMQException
     {
         try
         {
-            ApplicationRegistry.getInstance().getManagedObjectRegistry().unregisterObject(this);
+            getManagedObjectRegistry().unregisterObject(this);
         }
         catch (JMException e)
         {
@@ -91,14 +96,14 @@
     {
         return getObjectInstanceName() + "[" + getType() + "]";
     }
+    
 
     /**
      * Created the ObjectName as per the JMX Specs
      * @return ObjectName
      * @throws MalformedObjectNameException
      */
-    public ObjectName getObjectName()
-        throws MalformedObjectNameException
+    public ObjectName getObjectName() throws MalformedObjectNameException
     {
         String name = getObjectInstanceName();
         StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN);
@@ -113,26 +118,41 @@
         return new ObjectName(objectName.toString());
     }
 
-    private String getHierarchicalType(ManagedObject obj)
+    protected ObjectName getObjectNameForSingleInstanceMBean() throws MalformedObjectNameException
+    {
+        StringBuffer objectName = new StringBuffer(ManagedObject.DOMAIN);
+
+        objectName.append(":type=");
+        objectName.append(getHierarchicalType(this));
+
+        String hierarchyName = getHierarchicalName(this);
+        if (hierarchyName != null)
+        {
+            objectName.append(",");
+            objectName.append(hierarchyName.substring(0, hierarchyName.lastIndexOf(",")));
+        }
+
+        return new ObjectName(objectName.toString());
+    }
+
+    protected String getHierarchicalType(ManagedObject obj)
     {
-        String parentType = null;
         if (obj.getParentObject() != null)
         {
-            parentType = getHierarchicalType(obj.getParentObject()).toString();
+            String parentType = getHierarchicalType(obj.getParentObject()).toString();
             return parentType + "." + obj.getType();
         }
         else
             return obj.getType();
     }
 
-    private String getHierarchicalName(ManagedObject obj)
+    protected String getHierarchicalName(ManagedObject obj)
     {
-        String parentName = null;
         if (obj.getParentObject() != null)
         {
-            parentName = obj.getParentObject().getType() + "=" +
-                         obj.getParentObject().getObjectInstanceName() + ","+
-                         getHierarchicalName(obj.getParentObject());
+            String parentName = obj.getParentObject().getType() + "=" +
+                                obj.getParentObject().getObjectInstanceName() + ","+
+                                getHierarchicalName(obj.getParentObject());
 
             return parentName;
         }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java Wed Feb 14 12:02:03 2007
@@ -37,7 +37,7 @@
  */
 public interface ManagedBroker
 {
-    static final String TYPE = "BrokerManager";
+    static final String TYPE = "VirtualHostManager";
 
     /**
      * Creates a new Exchange.

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Wed Feb 14 12:02:03 2007
@@ -23,11 +23,14 @@
 import org.apache.log4j.Logger;
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.AMQProtocolVersionException;
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ConnectionStartBody;
@@ -49,14 +52,16 @@
 import org.apache.qpid.framing.ResponseManager;
 import org.apache.qpid.framing.RequestResponseMappingException;
 import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.MainRegistry;
+import org.apache.qpid.framing.VersionSpecificRegistry;
 import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.codec.AMQDecoder;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.pool.ReadWriteThreadModel;
 
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
@@ -65,6 +70,9 @@
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
 import javax.management.JMException;
 import javax.security.sasl.SaslServer;
@@ -76,7 +84,7 @@
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class AMQMinaProtocolSession implements AMQProtocolSession,
                                                ProtocolVersionList,
@@ -86,19 +94,26 @@
 
     private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
 
+    // to save boxing the channelId and looking up in a map... cache in an array the low numbered
+    // channels.  This value must be of the form 2^x - 1.
+    private static final int CHANNEL_CACHE_SIZE = 0xff;
+
     private final IoSession _minaProtocolSession;
 
-    private String _contextKey;
+    private AMQShortString _contextKey;
+
+    private VirtualHost _virtualHost;
 
     private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
 
+    private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE+1];
+
     private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
 
     private final AMQStateManager _stateManager;
 
-    private final QueueRegistry _queueRegistry;
-
-    private final ExchangeRegistry _exchangeRegistry;
+    private ExchangeRegistry _exchangeRegistry;
+    private MessageStore _messageStore;
 
     private AMQCodecFactory _codecFactory;
 
@@ -118,15 +133,15 @@
     private long _maxFrameSize = 65536;
 
     /* AMQP Version for this session */
-    private byte _major;
-    private byte _minor;
+    private byte _major = pv[pv.length-1][PROTOCOL_MAJOR];
+    private byte _minor = pv[pv.length-1][PROTOCOL_MINOR];
     private FieldTable _clientProperties;
-
     private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+    private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]);
 
     // Keeps a tally of connections for logging and debugging
-    private static AtomicInteger _ConnectionId;    
-    static { _ConnectionId = new AtomicInteger(0); }
+    private static AtomicLong _ConnectionId;    
+    static { _ConnectionId = new AtomicLong(0L); }
 
     public ManagedObject getManagedObject()
     {
@@ -134,26 +149,41 @@
     }
 
 
-    public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
+    public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
                                   AMQCodecFactory codecFactory)
             throws AMQException
     {
         _ConnectionId.incrementAndGet();
-        _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this);
+        _stateManager = new AMQStateManager(virtualHostRegistry, this);
         _minaProtocolSession = session;
         session.setAttachment(this);
-        _frameListeners.add(_stateManager);
-        _queueRegistry = queueRegistry;
-        _exchangeRegistry = exchangeRegistry;
         _codecFactory = codecFactory;
-        _managedObject = createMBean();
-        _managedObject.register();
+        _exchangeRegistry = null;
+        _messageStore = null;
+
         _closePending = false;
         _closed = false;
-        createChannel(0);
+        createChannel(0); // Required to handle requests / responses for channel 0
+        
+        _frameListeners.add(_stateManager);
+        _managedObject = createMBean();
+        _managedObject.register();
+
+       try
+       {
+           IoServiceConfig config = session.getServiceConfig();
+           ReadWriteThreadModel threadModel = (ReadWriteThreadModel) config.getThreadModel();
+           threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
+           threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
+       }
+       catch (RuntimeException e)
+       {
+           e.printStackTrace();
+       //    throw e;
+       }
     }
 
-    public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
+     public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
                                   AMQCodecFactory codecFactory, AMQStateManager stateManager)
             throws AMQException
     {
@@ -161,15 +191,14 @@
         _stateManager = stateManager;
         _minaProtocolSession = session;
         session.setAttachment(this);
-        _frameListeners.add(_stateManager);
-        _queueRegistry = queueRegistry;
-        _exchangeRegistry = exchangeRegistry;
+
         _codecFactory = codecFactory;
-        _managedObject = createMBean();
-        _managedObject.register();
+        _exchangeRegistry = null;
+        _messageStore = null;
+
         _closePending = false;
         _closed = false;
-        createChannel(0);
+        createChannel(0); // Required to handle requests / responses for channel 0
     }
 
     private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -197,8 +226,7 @@
 
     private AMQChannel createChannel(int id) throws AMQException
     {
-        IApplicationRegistry registry = ApplicationRegistry.getInstance();
-        AMQChannel channel = new AMQChannel(id, this, registry.getMessageStore(),
+        AMQChannel channel = new AMQChannel(id, this, _messageStore,
                                             _exchangeRegistry, _stateManager);
         addChannel(channel);
         return channel;
@@ -217,10 +245,12 @@
             try
             {
                 pi.checkVersion(this); // Fails if not correct
+
                 // This sets the protocol version (and hence framing classes) for this session.
-                _major = pi.protocolMajor;
-                _minor = pi.protocolMinor;
+                setProtocolVersion(pi.protocolMajor,pi.protocolMinor);
+
                 String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
+
                 String locales = "en_US";
                 // Interfacing with generated code - be aware of possible changes to parameter order as versions change.
                 AMQMethodBody connectionStartBody = ConnectionStartBody.createMethodBody
@@ -251,15 +281,15 @@
         else if(!_closed)
         {
             AMQFrame frame = (AMQFrame) message;
-            AMQChannel channel = getChannel(frame.channel);
+            AMQChannel channel = getChannel(frame.getChannel());
 
             if (_closePending)
             {
                 // If a close is pending (ie ChannelClose has been sent, but no ChannelCloseOk received), then
                 // all methods except ChannelCloseOk must be rejected. (AMQP spec)
-                if((frame.bodyFrame instanceof AMQRequestBody))
+                if((frame.getBodyFrame() instanceof AMQRequestBody))
                     throw new AMQException("Incoming request frame on connection which is pending close.");
-                AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
+                AMQRequestBody requestBody = (AMQRequestBody)frame.getBodyFrame();
                 if (!(requestBody.getMethodPayload() instanceof ConnectionCloseOkBody))
                     throw new AMQException("Incoming frame on closing connection is not a Connection.CloseOk method.");
             }
@@ -271,9 +301,9 @@
                 // b. Have a request id of 1 (i.e. the first request on a new channel);
                 // c. Must be a ConnectionOpenBody method.
                 // Throw an exception for all other incoming frames on an unopened channel
-                if(!(frame.bodyFrame instanceof AMQRequestBody))
+                if(!(frame.getBodyFrame() instanceof AMQRequestBody))
                     throw new AMQException("Incoming frame on unopened channel is not a request.");
-                AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
+                AMQRequestBody requestBody = (AMQRequestBody)frame.getBodyFrame();
                 if (!(requestBody.getMethodPayload() instanceof ChannelOpenBody)) {
                     closeSessionRequest(
                         requestBody.getMethodPayload().getConnectionException(
@@ -283,16 +313,16 @@
                 }
                 if (requestBody.getRequestId() != 1)
                     throw new AMQException("Incoming Channel.Open frame on unopened channel does not have a request id = 1.");
-                channel = createChannel(frame.channel);
+                channel = createChannel(frame.getChannel());
             }
 
-            if (frame.bodyFrame instanceof AMQRequestBody)
+            if (frame.getBodyFrame() instanceof AMQRequestBody)
             {
-            	requestFrameReceived(frame.channel, (AMQRequestBody)frame.bodyFrame);
+            	requestFrameReceived(frame.getChannel(), (AMQRequestBody)frame.getBodyFrame());
             }
-            else if (frame.bodyFrame instanceof AMQResponseBody)
+            else if (frame.getBodyFrame() instanceof AMQResponseBody)
             {
-            	responseFrameReceived(frame.channel, (AMQResponseBody)frame.bodyFrame);
+            	responseFrameReceived(frame.getChannel(), (AMQResponseBody)frame.getBodyFrame());
             }
             else
             {
@@ -303,10 +333,11 @@
     
     private void requestFrameReceived(int channelNum, AMQRequestBody requestBody) throws Exception
     {
-        try{
+        try
+        {
             if (_logger.isDebugEnabled())
             {
-                    _logger.debug("Request frame received: " + requestBody);
+                _logger.debug("Request frame received: " + requestBody);
             }
             AMQChannel channel = getChannel(channelNum);
             ResponseManager responseManager = channel.getResponseManager();
@@ -383,12 +414,12 @@
         _minaProtocolSession.write(frame);
     }
 
-    public String getContextKey()
+    public AMQShortString getContextKey()
     {
         return _contextKey;
     }
 
-    public void setContextKey(String contextKey)
+    public void setContextKey(AMQShortString contextKey)
     {
         _contextKey = contextKey;
     }
@@ -400,7 +431,9 @@
 
     public AMQChannel getChannel(int channelId)
     {
-        return _channelMap.get(channelId);
+        return ((channelId & CHANNEL_CACHE_SIZE) == channelId)
+                ? _cachedChannels[channelId]
+                : _channelMap.get(channelId);
     }
 
     public void addChannel(AMQChannel channel)
@@ -410,7 +443,13 @@
             throw new IllegalStateException("Session is closed");
         }
 
-        _channelMap.put(channel.getChannelId(), channel);
+        final int channelId = channel.getChannelId();
+        _channelMap.put(channelId, channel);
+
+        if(((channelId & CHANNEL_CACHE_SIZE) == channelId))
+        {
+            _cachedChannels[channelId] = channel;
+        }
         checkForNotification();
     }
 
@@ -450,7 +489,7 @@
     }
     
     // Used to initiate a channel close from the server side and inform the client
-    public void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException
+    public void closeChannelRequest(int channelId, int replyCode, AMQShortString replyText) throws AMQException
     {
         final AMQChannel channel = _channelMap.get(channelId);
         if (channel == null)
@@ -481,7 +520,7 @@
     // Used to close a channel as a response to a client close request
     public void closeChannelResponse(int channelId, long requestId) throws AMQException
     {
-        final AMQChannel channel = _channelMap.get(channelId);
+        final AMQChannel channel = getChannel(channelId);
         if (channel == null)
         {
             throw new IllegalArgumentException("Unknown channel id");
@@ -497,13 +536,14 @@
             }
             finally
             {
-                _channelMap.remove(channelId);
+                removeChannel(channelId);
+
             }
         }
     }
 
     // Used to initiate a connection close from the server side and inform the client
-    public void closeSessionRequest(int replyCode, String replyText, int classId, int methodId) throws AMQException
+    public void closeSessionRequest(int replyCode, AMQShortString replyText, int classId, int methodId) throws AMQException
     {
         _closePending = true; // This prevents all methods except Close-Ok from being accepted
         _stateManager.changeState(AMQState.CONNECTION_CLOSING);
@@ -523,7 +563,7 @@
         closeSession();
     }
     
-    public void closeSessionRequest(int replyCode, String replyText) throws AMQException
+    public void closeSessionRequest(int replyCode, AMQShortString replyText) throws AMQException
     {
         closeSessionRequest(replyCode, replyText, 0, 0);
     }
@@ -531,7 +571,7 @@
 
     public void closeSessionRequest(AMQConnectionException e) throws AMQException
     {
-        closeSessionRequest(e.getErrorCode(), e.getMessage(), e.getClassId(), e.getMethodId());
+        closeSessionRequest(e.getErrorCode(), new AMQShortString(e.getMessage()), e.getClassId(), e.getMethodId());
     }
 
     
@@ -569,6 +609,10 @@
     public void removeChannel(int channelId)
     {
         _channelMap.remove(channelId);
+        if((channelId & CHANNEL_CACHE_SIZE) == channelId)
+        {
+            _cachedChannels[channelId] = null;
+        }
     }
 
     /**
@@ -614,6 +658,10 @@
             channel.close(this);
         }
         _channelMap.clear();
+        for(int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
+        {
+            _cachedChannels[i]=null;
+        }
     }
 
     public String toString()
@@ -679,52 +727,46 @@
         _clientProperties = clientProperties;
         if((_clientProperties != null) && (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null))
         {
-            setContextKey(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE));
+            setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
         }
     }
     
-    public QueueRegistry getQueueRegistry()
-    {
-        return _queueRegistry;
-    }
-    
-    public ExchangeRegistry getExchangeRegistry()
-    {
-        return _exchangeRegistry;
-    }
-    
     public AMQStateManager getStateManager()
     {
         return _stateManager;
     }
 
-    /**
-     * Convenience methods for managing AMQP version.
-     * NOTE: Both major and minor will be set to 0 prior to protocol initiation.
-     */
+    private void setProtocolVersion(byte major, byte minor)
+    {
+        _major = major;
+        _minor = minor;
+        _registry = MainRegistry.getVersionSpecificRegistry(major,minor);
+    }
 
-    public byte getMajor()
+    public byte getProtocolMajorVersion()
     {
         return _major;
     }
 
-    public byte getMinor()
+    public byte getProtocolMinorVersion()
     {
         return _minor;
     }
 
-    public boolean versionEquals(byte major, byte minor)
+    public boolean isProtocolVersionEqual(byte major, byte minor)
     {
         return _major == major && _minor == minor;
     }
 
-    public void checkMethodBodyVersion(AMQMethodBody methodBody) {
-        if (!versionEquals(methodBody.getMajor(), methodBody.getMinor())) {
+    public void checkMethodBodyVersion(AMQMethodBody methodBody)
+    {
+        if (!isProtocolVersionEqual(methodBody.getMajor(), methodBody.getMinor()))
+        {
             throw new RuntimeException("MethodBody version did not match version of current session.");
         }
     }
     
-    public int getConnectionId()
+    public long getConnectionId()
     {
         return _ConnectionId.get();
     }
@@ -742,5 +784,25 @@
     public void removeSessionCloseTask(Task task)
     {
         _taskList.remove(task);
+    }
+
+    public VersionSpecificRegistry getRegistry()
+    {
+        return _registry;
+    }
+
+
+    public VirtualHost getVirtualHost()
+    {
+        return _virtualHost;
+    }
+
+    public void setVirtualHost(VirtualHost virtualHost) throws AMQException
+    {
+        _virtualHost = virtualHost;
+        _exchangeRegistry = virtualHost.getExchangeRegistry();
+        _messageStore = virtualHost.getMessageStore();
+        _managedObject = createMBean();
+        _managedObject.register();
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Wed Feb 14 12:02:03 2007
@@ -54,41 +54,26 @@
 {
     private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class);
 
-    /**
-     * The registry of all queues. This is passed to frame listeners when frame
-     * events occur.
-     */
-    private final QueueRegistry _queueRegistry;
+    private final IApplicationRegistry _applicationRegistry;
 
-    /**
-     * The registry of all exchanges. This is passed to frame listeners when frame
-     * events occur.
-     */
-    private final ExchangeRegistry _exchangeRegistry;
 
     private boolean _useSSL;
 
     public AMQPFastProtocolHandler(Integer applicationRegistryInstance)
     {
-        IApplicationRegistry registry = ApplicationRegistry.getInstance(applicationRegistryInstance);
-
-        _queueRegistry = registry.getQueueRegistry();
-        _exchangeRegistry = registry.getExchangeRegistry();
-        _logger.debug("AMQPFastProtocolHandler created");
+        this(ApplicationRegistry.getInstance(applicationRegistryInstance));
     }
 
-    public AMQPFastProtocolHandler(QueueRegistry queueRegistry,
-                                   ExchangeRegistry exchangeRegistry)
+    public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry)
     {
-        _queueRegistry = queueRegistry;
-        _exchangeRegistry = exchangeRegistry;
+        _applicationRegistry = applicationRegistry;
 
         _logger.debug("AMQPFastProtocolHandler created");
     }
 
     protected AMQPFastProtocolHandler(AMQPFastProtocolHandler handler)
     {
-        this(handler._queueRegistry, handler._exchangeRegistry);
+        this(handler._applicationRegistry);
     }
 
     public void sessionCreated(IoSession protocolSession) throws Exception
@@ -96,7 +81,7 @@
         SessionUtil.initialize(protocolSession);
         final AMQCodecFactory codecFactory = new AMQCodecFactory(true);
 
-        createSession(protocolSession, _queueRegistry, _exchangeRegistry, codecFactory);
+        createSession(protocolSession, _applicationRegistry, codecFactory);
         _logger.info("Protocol session created");
 
         final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory);
@@ -121,9 +106,9 @@
     /**
      * Separated into its own, protected, method to allow easier reuse
      */
-    protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException
+    protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException
     {
-        new AMQMinaProtocolSession(session, queues, exchanges, codec);
+        new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec);
     }
 
     public void sessionOpened(IoSession protocolSession) throws Exception
@@ -135,7 +120,11 @@
     {
         _logger.info("Protocol Session closed");
         final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
-        amqProtocolSession.closeSession();
+        //fixme  -- this can be null
+        if(amqProtocolSession != null)
+        {
+            amqProtocolSession.closeSession();
+        }
     }
 
     public void sessionIdle(IoSession session, IdleStatus status) throws Exception
@@ -154,8 +143,7 @@
 
     }
 
-    public void exceptionCaught(IoSession protocolSession, AMQMethodListener methodListener,
-        Throwable throwable) throws Exception
+    public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception
     {
         AMQProtocolSession session = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
         if (throwable instanceof AMQProtocolHeaderException)
@@ -176,7 +164,7 @@
         {
             _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
             // TODO: Closing with code 200 ("reply-sucess") ??? This cannot be right!
-            session.closeSessionRequest(200, throwable.getMessage());
+            session.closeSessionRequest(200, new AMQShortString(throwable.getMessage()));
         }
     }
 

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java Wed Feb 14 12:02:03 2007
@@ -42,8 +42,7 @@
     public AMQPProtocolProvider()
     {
         IApplicationRegistry registry = ApplicationRegistry.getInstance();
-        _handler = new AMQPFastProtocolHandler(registry.getQueueRegistry(),
-                                               registry.getExchangeRegistry());
+        _handler = new AMQPFastProtocolHandler(registry);
     }
 
     public AMQPFastProtocolHandler getHandler()