You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/02/14 21:02:22 UTC

svn commit: r507672 [4/16] - in /incubator/qpid/branches/qpid.0-9: gentools/src/org/apache/qpid/gentools/ gentools/templ.java/ gentools/templ.net/ java/ java/broker/ java/broker/bin/ java/broker/distribution/ java/broker/distribution/src/ java/broker/d...

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java Wed Feb 14 12:02:03 2007
@@ -20,15 +20,9 @@
  */
 package org.apache.qpid.server.filter;
 
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.filter.jms.selector.SelectorParser;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInvalidSelectorException;
 import org.apache.log4j.Logger;
-
-
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
 
 public class NoConsumerFilter implements MessageFilter
 {
@@ -36,7 +30,7 @@
 
 
     public NoConsumerFilter() throws AMQException
-    {        
+    {
         _logger.info("Created NoConsumerFilter");
     }
 

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java Wed Feb 14 12:02:03 2007
@@ -21,19 +21,12 @@
 // Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
 //
          
-import java.io.IOException;
+//import java.io.IOException;
 import java.util.HashMap;
 
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-
-//import org.apache.activemq.command.ActiveMQDestination;
-//import org.apache.activemq.command.Message;
-//import org.apache.activemq.command.TransactionId;
-//import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.message.jms.JMSMessage;
 import org.apache.qpid.AMQException;
 import org.apache.log4j.Logger;
 
@@ -44,41 +37,11 @@
  */
 public class PropertyExpression implements Expression
 {
-
-    interface SubExpression
-    {
-        public Object evaluate(AMQMessage message);
-    }
-
-    interface JMSExpression
-    {
-        public abstract Object evaluate(JMSMessage message);
-    }
-
-    static class SubJMSExpression implements SubExpression
-    {
-        JMSExpression _expression;
-
-        SubJMSExpression(JMSExpression expression)
-        {
-            _expression = expression;
-        }
-
-
-        public Object evaluate(AMQMessage message)
-        {
-            JMSMessage msg = (JMSMessage) message.getDecodedMessage(AMQMessage.JMS_MESSAGE);
-            if (msg != null)
-            {
-                return _expression.evaluate(msg);
-            }
-            else
-            {
-                return null;
-            }
-        }
-    }
-
+    // Constants - defined the same as JMS
+    private static final int NON_PERSISTENT = 1;
+    private static final int PERSISTENT = 2;
+    private static final int DEFAULT_PRIORITY = 4;
+    
     private final static Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class);
 
 
@@ -86,62 +49,43 @@
 
     static
     {
-        JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new SubJMSExpression(
-                new JMSExpression()
+        JMS_PROPERTY_EXPRESSIONS.put("JMSDestination",
+                new Expression()
                 {
-                    public Object evaluate(JMSMessage message)
+                    public Object evaluate(AMQMessage message)
                     {
-                        return message.getJMSDestination();
+                        return message.getDestination();
                     }
                 }
-        ));
-//
-//            public Object evaluate(AMQMessage message)
-//            {
-//                //fixme
-//
-//
-////                AMQDestination dest = message.getOriginalDestination();
-////                if (dest == null)
-////                {
-////                    dest = message.getDestination();
-////                }
-////                if (dest == null)
-////                {
-////                    return null;
-////                }
-////                return dest.toString();
-//                return "";
-//            }
-//        });
-        JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new SubJMSExpression(
-                new JMSExpression()
+        );
+        JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo",
+                new Expression()
                 {
-                    public Object evaluate(JMSMessage message)
+                    public Object evaluate(AMQMessage message)
                     {
-                        return message.getJMSReplyTo();
+                        return message.getReplyTo();
                     }
-                })
+                }
         );
 
-        JMS_PROPERTY_EXPRESSIONS.put("JMSType", new SubJMSExpression(
-                new JMSExpression()
+        JMS_PROPERTY_EXPRESSIONS.put("JMSType",
+                new Expression()
                 {
-                    public Object evaluate(JMSMessage message)
+                    public Object evaluate(AMQMessage message)
                     {
-                        return message.getJMSType();
+                        return message.getType();
                     }
                 }
-        ));
+        );
 
-        JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new SubJMSExpression(
-                new JMSExpression()
+        JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode",
+                new Expression()
                 {
-                    public Object evaluate(JMSMessage message)
+                    public Object evaluate(AMQMessage message)
                     {
                         try
                         {
-                            Integer mode = new Integer(message.getAMQMessage().isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+                            int mode = message.isPersistent() ? PERSISTENT : NON_PERSISTENT;
                             _logger.info("JMSDeliveryMode is :" + mode);
                             return mode;
                         }
@@ -150,83 +94,83 @@
                             //shouldn't happen
                         }
 
-                        return DeliveryMode.NON_PERSISTENT;
+                        return NON_PERSISTENT;
                     }
-                }));
+                });
 
-        JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new SubJMSExpression(
-                new JMSExpression()
+        JMS_PROPERTY_EXPRESSIONS.put("JMSPriority",
+                new Expression()
                 {
-                    public Object evaluate(JMSMessage message)
+                    public Object evaluate(AMQMessage message)
                     {
-                        return message.getJMSPriority();
+                        return message.getPriority();
                     }
                 }
-        ));
+        );
 
 
-        JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new SubJMSExpression(
-                new JMSExpression()
+        JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID",
+                new Expression()
                 {
-                    public Object evaluate(JMSMessage message)
+                    public Object evaluate(AMQMessage message)
                     {
-                        return message.getAMQMessage().getMessageId();
+                        return message.getMessageId();
                     }
                 }
-        ));
+        );
 
-        JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new SubJMSExpression(
-                new JMSExpression()
+        JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp",
+                new Expression()
                 {
-                    public Object evaluate(JMSMessage message)
+                    public Object evaluate(AMQMessage message)
                     {
-                        return message.getJMSTimestamp();
+                        return message.getTimestamp();
                     }
                 }
-        ));
+        );
 
-        JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new SubJMSExpression(
-                new JMSExpression()
+        JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID",
+                new Expression()
                 {
-                    public Object evaluate(JMSMessage message)
+                    public Object evaluate(AMQMessage message)
                     {
-                        return message.getJMSCorrelationID();
+                        return message.getCorrelationId();
                     }
                 }
-        ));
+        );
 
-        JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new SubJMSExpression(
-                new JMSExpression()
+        JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration",
+                new Expression()
                 {
-                    public Object evaluate(JMSMessage message)
+                    public Object evaluate(AMQMessage message)
                     {
-                        return message.getJMSExpiration();
+                        return message.getExpiration();
                     }
                 }
-        ));
+        );
 
-        JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new SubJMSExpression(
-                new JMSExpression()
+        JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered",
+                new Expression()
                 {
-                    public Object evaluate(JMSMessage message)
+                    public Object evaluate(AMQMessage message)
                     {
-                        return message.getAMQMessage().isRedelivered();
+                        return message.isRedelivered();
                     }
                 }
-        ));
+        );
 
     }
 
-    private final String name;
-    private final SubExpression jmsPropertyExpression;
+    private final AMQShortString name;
+    private final Expression jmsPropertyExpression;
 
-    public PropertyExpression(String name)
+    public PropertyExpression(AMQShortString name)
     {
         this.name = name;
-        jmsPropertyExpression = (SubExpression) JMS_PROPERTY_EXPRESSIONS.get(name);
+        jmsPropertyExpression = (Expression) JMS_PROPERTY_EXPRESSIONS.get(name);
     }
 
-    public Object evaluate(AMQMessage message) throws JMSException
+    public Object evaluate(AMQMessage message) throws AMQException
     {
 //        try
 //        {
@@ -252,21 +196,21 @@
         }
 //            catch (IOException ioe)
 //            {
-//                JMSException exception = new JMSException("Could not get property: " + name + " reason: " + ioe.getMessage());
+//                AMQException exception = new AMQException("Could not get property: " + name + " reason: " + ioe.getMessage());
 //                exception.initCause(ioe);
 //                throw exception;
 //            }
 //        }
 //        catch (IOException e)
 //        {
-//            JMSException exception = new JMSException(e.getMessage());
+//            AMQException exception = new AMQException(e.getMessage());
 //            exception.initCause(e);
 //            throw exception;
 //        }
 
     }
 
-    public String getName()
+    public AMQShortString getName()
     {
         return name;
     }
@@ -277,7 +221,7 @@
      */
     public String toString()
     {
-        return name;
+        return name.asString();
     }
 
     /**

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java Wed Feb 14 12:02:03 2007
@@ -20,16 +20,15 @@
  */
 package org.apache.qpid.server.filter;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.server.queue.AMQMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.qpid.AMQException;
 
-import javax.jms.JMSException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class SimpleFilterManager implements FilterManager
 {
-    private final Logger _logger = LoggerFactory.getLogger(SimpleFilterManager.class);
+    private final Logger _logger = Logger.getLogger(SimpleFilterManager.class);
 
     private final ConcurrentLinkedQueue<MessageFilter> _filters;
 
@@ -60,10 +59,10 @@
                     return false;
                 }
             }
-            catch (JMSException e)
+            catch (AMQException e)
             {
                 //fixme
-                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                e.printStackTrace();  
                 return false;
             }
         }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java Wed Feb 14 12:02:03 2007
@@ -20,8 +20,8 @@
 // Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
 //
 
+import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.message.jms.JMSMessage;
 
 import java.math.BigDecimal;
 import java.util.Collection;
@@ -29,8 +29,6 @@
 import java.util.Iterator;
 import java.util.List;
 
-import javax.jms.JMSException;
-
 /**
  * An expression which performs an operation on two expression values
  * 
@@ -43,7 +41,8 @@
 
     public static Expression createNegate(Expression left) {
         return new UnaryExpression(left) {
-            public Object evaluate(AMQMessage message) throws JMSException {
+            public Object evaluate(AMQMessage message) throws AMQException
+            {
                 Object rvalue = right.evaluate(message);
                 if (rvalue == null) {
                     return null;
@@ -74,7 +73,7 @@
     	final Collection inList = t;
     	
         return new BooleanUnaryExpression(right) {
-            public Object evaluate(AMQMessage message) throws JMSException {
+            public Object evaluate(AMQMessage message) throws AMQException {
             	
                 Object rvalue = right.evaluate(message);
                 if (rvalue == null) {
@@ -126,7 +125,7 @@
             super(left);
         }
 
-        public boolean matches(AMQMessage message) throws JMSException {
+        public boolean matches(AMQMessage message) throws AMQException {
             Object object = evaluate(message);
             return object!=null && object==Boolean.TRUE;            
         }
@@ -135,7 +134,7 @@
         
     public static BooleanExpression createNOT(BooleanExpression left) {
         return new BooleanUnaryExpression(left) {
-            public Object evaluate(AMQMessage message) throws JMSException {
+            public Object evaluate(AMQMessage message) throws AMQException {
                 Boolean lvalue = (Boolean) right.evaluate(message);
                 if (lvalue == null) {
                     return null;
@@ -159,7 +158,7 @@
 
     public static BooleanExpression createBooleanCast(Expression left) {
         return new BooleanUnaryExpression(left) {
-            public Object evaluate(AMQMessage message) throws JMSException {
+            public Object evaluate(AMQMessage message) throws AMQException {
                 Object rvalue = right.evaluate(message);
                 if (rvalue == null) 
                     return null;

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java Wed Feb 14 12:02:03 2007
@@ -20,18 +20,14 @@
 // Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
 //
 
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-import javax.jms.JMSException;
-
-//import org.apache.activemq.command.Message;
-//import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
 /**
  * Used to evaluate an XPath Expression in a JMS selector.
  */
@@ -75,7 +71,7 @@
     private final XPathEvaluator evaluator;
     
     static public interface XPathEvaluator {
-        public boolean evaluate(AMQMessage message) throws JMSException;
+        public boolean evaluate(AMQMessage message) throws AMQException;
     }    
     
     XPathExpression(String xpath) {
@@ -97,7 +93,7 @@
         }
     }
 
-    public Object evaluate(AMQMessage message) throws JMSException {
+    public Object evaluate(AMQMessage message) throws AMQException {
 //        try {
 //FIXME this is flow to disk work
 //            if( message.isDropped() )
@@ -120,9 +116,10 @@
     /**
      * @param message
      * @return true if the expression evaluates to Boolean.TRUE.
-     * @throws JMSException
+     * @throws AMQException
      */
-    public boolean matches(AMQMessage message) throws JMSException {
+    public boolean matches(AMQMessage message) throws AMQException
+    {
         Object object = evaluate(message);
         return object!=null && object==Boolean.TRUE;            
     }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java Wed Feb 14 12:02:03 2007
@@ -17,9 +17,9 @@
  */
 package org.apache.qpid.server.filter;
 
+import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
 
-import javax.jms.JMSException;
 //
 // Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
 //
@@ -35,7 +35,7 @@
         this.xpath = xpath;
     }
 
-    public Object evaluate(AMQMessage message) throws JMSException {
+    public Object evaluate(AMQMessage message) throws AMQException {
         return Boolean.FALSE;
     }
 
@@ -46,9 +46,10 @@
     /**
      * @param message
      * @return true if the expression evaluates to Boolean.TRUE.
-     * @throws JMSException
+     * @throws AMQException
      */
-    public boolean matches(AMQMessage message) throws JMSException {
+    public boolean matches(AMQMessage message) throws AMQException
+    {
         Object object = evaluate(message);
         return object!=null && object==Boolean.TRUE;            
     }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java Wed Feb 14 12:02:03 2007
@@ -21,23 +21,18 @@
 // Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
 //
 
-import java.io.StringReader;
-import java.io.ByteArrayInputStream;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.TextMessage;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-
-//import org.apache.activemq.command.Message;
-//import org.apache.activemq.util.ByteArrayInputStream;
-import org.apache.xpath.CachedXPathAPI;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.xpath.CachedXPathAPI;
 import org.w3c.dom.Document;
 import org.w3c.dom.traversal.NodeIterator;
 import org.xml.sax.InputSource;
 
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import java.io.ByteArrayInputStream;
+import java.io.StringReader;
+
 public class XalanXPathEvaluator implements XPathExpression.XPathEvaluator {
     
     private final String xpath;
@@ -46,17 +41,24 @@
         this.xpath = xpath;
     }
     
-    public boolean evaluate(AMQMessage m) throws JMSException {
+    public boolean evaluate(AMQMessage m) throws AMQException
+    {
+        // TODO - we would have to check the content type and then evaluate the content
+        //        here... is this really a feature we wish to implement? - RobG
+        /*
+
         if( m instanceof TextMessage ) {
             String text = ((TextMessage)m).getText();
-            return evaluate(text);                
+            return evaluate(text);
         } else if ( m instanceof BytesMessage ) {
             BytesMessage bm = (BytesMessage) m;
             byte data[] = new byte[(int) bm.getBodyLength()];
             bm.readBytes(data);
             return evaluate(data);
-        }            
+        }
+        */
         return false;
+
     }
 
     private boolean evaluate(byte[] data) {

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Wed Feb 14 12:02:03 2007
@@ -20,18 +20,15 @@
  */
 package org.apache.qpid.server.handler;
 
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ChannelCloseOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
+import org.apache.log4j.Logger;
+
 public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody>
 {
     private static final Logger _logger = Logger.getLogger(ChannelCloseHandler.class);
@@ -43,16 +40,14 @@
         return _instance;
     }
 
-    private ChannelCloseHandler()
-    {
-    }
+    private ChannelCloseHandler() {}
 
-    public void methodReceived(AMQProtocolSession protocolSession,
-                               AMQMethodEvent<ChannelCloseBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseBody> evt) throws AMQException
     {
-        ChannelCloseBody body = evt.getMethod();
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        final ChannelCloseBody body = evt.getMethod();
         _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " +
             body.classId + " and method " + body.methodId);
-        protocolSession.closeChannelResponse(evt.getChannelId(), evt.getRequestId());
+        session.closeChannelResponse(evt.getChannelId(), evt.getRequestId());
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java Wed Feb 14 12:02:03 2007
@@ -23,11 +23,10 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ChannelCloseOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
+
 import org.apache.log4j.Logger;
 
 public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCloseOkBody>
@@ -41,14 +40,12 @@
         return _instance;
     }
 
-    private ChannelCloseOkHandler()
-    {
-    }
+    private ChannelCloseOkHandler() {}
 
-    public void methodReceived(AMQProtocolSession protocolSession,
-                               AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException
     {
+        AMQProtocolSession session = stateManager.getProtocolSession();
         _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
-        protocolSession.removeChannel(evt.getChannelId());
+        session.removeChannel(evt.getChannelId());
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java Wed Feb 14 12:02:03 2007
@@ -20,18 +20,17 @@
  */
 package org.apache.qpid.server.handler;
 
-import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.ChannelFlowBody;
 import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+import org.apache.log4j.Logger;
 
 public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowBody>
 {
@@ -44,24 +43,21 @@
         return _instance;
     }
 
-    private ChannelFlowHandler()
-    {
-    }
+    private ChannelFlowHandler() {}
 
-    public void methodReceived(AMQProtocolSession protocolSession,
-                               AMQMethodEvent<ChannelFlowBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelFlowBody> evt) throws AMQException
     {
-        ChannelFlowBody body = evt.getMethod();
-
-        AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        final ChannelFlowBody body = evt.getMethod();
+        AMQChannel channel = session.getChannel(evt.getChannelId());
         channel.setSuspended(!body.active);
         _logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active);
 
         // Be aware of possible changes to parameter order as versions change.
         AMQMethodBody response = ChannelFlowOkBody.createMethodBody(
-            protocolSession.getMajor(), // AMQP major version
-            protocolSession.getMinor(), // AMQP minor version
+            session.getProtocolMajorVersion(), // AMQP major version
+            session.getProtocolMinorVersion(), // AMQP minor version
             body.active);	// active
-        protocolSession.writeResponse(evt, response);
+        session.writeResponse(evt, response);
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java Wed Feb 14 12:02:03 2007
@@ -26,16 +26,17 @@
 import org.apache.qpid.framing.ChannelOpenOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+//import org.apache.log4j.Logger;
 
 public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenBody>
 {
+    //private static final Logger _logger = Logger.getLogger(ChannelOpenHandler.class);
+    
     private static ChannelOpenHandler _instance = new ChannelOpenHandler();
 
     public static ChannelOpenHandler getInstance()
@@ -43,19 +44,18 @@
         return _instance;
     }
 
-    private ChannelOpenHandler()
-    {
-    }
+    private ChannelOpenHandler() {}
 
-    public void methodReceived(AMQProtocolSession protocolSession,
-                               AMQMethodEvent<ChannelOpenBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelOpenBody> evt) throws AMQException
     {
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        VirtualHost virtualHost = session.getVirtualHost();
         // Be aware of possible changes to parameter order as versions change.
         // XXX: Client id
         AMQMethodBody response = ChannelOpenOkBody.createMethodBody(
-            protocolSession.getMajor(), // AMQP major version
-            protocolSession.getMinor(), // AMQP minor version
+            session.getProtocolMajorVersion(), // AMQP major version
+            session.getProtocolMinorVersion(), // AMQP minor version
             "XXX".getBytes());
-        protocolSession.writeResponse(evt, response);
+        session.writeResponse(evt, response);
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -20,16 +20,14 @@
  */
 package org.apache.qpid.server.handler;
 
+import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
 import org.apache.log4j.Logger;
 
 public class ConnectionCloseMethodHandler implements  StateAwareMethodListener<ConnectionCloseBody>
@@ -43,16 +41,14 @@
         return _instance;
     }
 
-    private ConnectionCloseMethodHandler()
-    {
-    }
+    private ConnectionCloseMethodHandler() {}
 
-    public void methodReceived(AMQProtocolSession protocolSession,
-                               AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException
     {
+        AMQProtocolSession session = stateManager.getProtocolSession();
         final ConnectionCloseBody body = evt.getMethod();
         _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode +
-            "/" + body.replyText +  " for " + protocolSession);
-        protocolSession.closeSessionResponse(evt.getRequestId());
+            "/" + body.replyText +  " for " + session);
+        session.closeSessionResponse(evt.getRequestId());
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -23,12 +23,10 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ConnectionCloseOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.state.AMQState;
+
 import org.apache.log4j.Logger;
 
 public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener<ConnectionCloseOkBody>
@@ -42,14 +40,12 @@
         return _instance;
     }
 
-    private ConnectionCloseOkMethodHandler()
-    {
-    }
+    private ConnectionCloseOkMethodHandler() {}
 
-    public void methodReceived(AMQProtocolSession protocolSession,
-                               AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException
     {
+        AMQProtocolSession session = stateManager.getProtocolSession();
         _logger.info("Received Connection-close-ok");
-        protocolSession.closeSession();
+        session.closeSession();
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -22,18 +22,23 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ConnectionOpenBody;
 import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+//import org.apache.log4j.Logger;
 
 public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody>
 {
+    //private static final Logger _logger = Logger.getLogger(ConnectionOpenMethodHandler.class);
+    
     private static ConnectionOpenMethodHandler _instance = new ConnectionOpenMethodHandler();
 
     public static ConnectionOpenMethodHandler getInstance()
@@ -41,32 +46,50 @@
         return _instance;
     }
 
-    private ConnectionOpenMethodHandler()
-    {
-    }
+    private ConnectionOpenMethodHandler() {}
 
-    private static String generateClientID()
+    private static AMQShortString generateClientID()
     {
-        return Long.toString(System.currentTimeMillis());
+        return new AMQShortString(Long.toString(System.currentTimeMillis()));
     }
 
-    public void methodReceived(AMQProtocolSession protocolSession,
-                               AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException
     {
-        ConnectionOpenBody body = evt.getMethod();
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        final ConnectionOpenBody body = evt.getMethod();
+
+        //ignore leading '/'
+        String virtualHostName;
+        if((body.virtualHost != null) && body.virtualHost.charAt(0) == '/')
+        {
+            virtualHostName = new StringBuilder(body.virtualHost.subSequence(1,body.virtualHost.length())).toString();
+        }
+        else
+        {
+            virtualHostName = String.valueOf(body.virtualHost);
+        }
+
+        VirtualHost virtualHost = stateManager.getVirtualHostRegistry().getVirtualHost(virtualHostName);
+
+        if(virtualHost == null)
+        {
+            throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(), "Unknown virtual host: " + virtualHostName);
+        }
+        
+        session.setVirtualHost( virtualHost );
 
-        //todo //FIXME The virtual host must be validated by the server for the connection to open-ok
         // See Spec (0.8.2). Section  3.1.2 Virtual Hosts
-        if (protocolSession.getContextKey() == null)
+        if (session.getContextKey() == null)
         {
-            protocolSession.setContextKey(generateClientID());
+            session.setContextKey(generateClientID());
         }
+
         // Be aware of possible changes to parameter order as versions change.
         AMQMethodBody response = ConnectionOpenOkBody.createMethodBody(
-            protocolSession.getMajor(), // AMQP major version
-            protocolSession.getMinor(), // AMQP minor version
+            session.getProtocolMajorVersion(), // AMQP major version
+            session.getProtocolMinorVersion(), // AMQP minor version
             body.virtualHost);	// knownHosts
-        protocolSession.getStateManager().changeState(AMQState.CONNECTION_OPEN);
-        protocolSession.writeResponse(evt, response);
+        session.getStateManager().changeState(AMQState.CONNECTION_OPEN);
+        session.writeResponse(evt, response);
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -21,19 +21,22 @@
 package org.apache.qpid.server.handler;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.protocol.HeartbeatConfig;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.auth.AuthenticationManager;
 import org.apache.qpid.server.security.auth.AuthenticationResult;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+
 import org.apache.log4j.Logger;
 
 import javax.security.sasl.SaslServer;
@@ -50,35 +53,34 @@
         return _instance;
     }
 
-    private ConnectionSecureOkMethodHandler()
-    {
-    }
+    private ConnectionSecureOkMethodHandler() {}
 
-    public void methodReceived(AMQProtocolSession protocolSession,
-                               AMQMethodEvent<ConnectionSecureOkBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionSecureOkBody> evt) throws AMQException
     {
-        ConnectionSecureOkBody body = evt.getMethod();
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        final ConnectionSecureOkBody body = evt.getMethod();
 
         AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager();
-        SaslServer ss = protocolSession.getSaslServer();
+        SaslServer ss = session.getSaslServer();
         if (ss == null)
         {
             throw new AMQException("No SASL context set up in session");
         }
 
         AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
-        AMQStateManager stateManager = protocolSession.getStateManager();
-        byte major = protocolSession.getMajor();
-        byte minor = protocolSession.getMinor();
+        byte major = session.getProtocolMajorVersion();
+        byte minor = session.getProtocolMinorVersion();
         switch (authResult.status)
         {
             case ERROR:
                 // Can't do this as we violate protocol. Need to send Close
                 // throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName());
                 _logger.info("Authentication failed");
-                disposeSaslServer(protocolSession);
-                protocolSession.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
+                stateManager.changeState(AMQState.CONNECTION_CLOSING);
+                // Be aware of possible changes to parameter order as versions change.
+                session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
                     AMQConstant.NOT_ALLOWED.getName(), body.getClazz(), body.getMethod());
+                disposeSaslServer(session);
                 break;
             case SUCCESS:
                 _logger.info("Connected as: " + ss.getAuthorizationID());
@@ -92,8 +94,8 @@
                     Integer.MAX_VALUE,	// channelMax
                     ConnectionStartOkMethodHandler.getConfiguredFrameSize(),	// frameMax
                     HeartbeatConfig.getInstance().getDelay());	// heartbeat
-                protocolSession.writeResponse(evt, tune);
-                disposeSaslServer(protocolSession);
+                session.writeResponse(evt, tune);
+                disposeSaslServer(session);
                 break;
             case CONTINUE:
                 stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
@@ -101,7 +103,7 @@
                 AMQMethodBody challenge = ConnectionSecureBody.createMethodBody(
                     major, minor,	// AMQP version (major, minor)
                     authResult.challenge);	// challenge
-                protocolSession.writeResponse(evt, challenge);
+                session.writeResponse(evt, challenge);
         }
     }
 

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -20,18 +20,14 @@
  */
 package org.apache.qpid.server.handler;
 
-import org.apache.log4j.Logger;
-import org.apache.commons.configuration.Configuration;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.ConnectionSecureBody;
 import org.apache.qpid.framing.ConnectionStartOkBody;
 import org.apache.qpid.framing.ConnectionTuneBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.protocol.HeartbeatConfig;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.auth.AuthenticationManager;
 import org.apache.qpid.server.security.auth.AuthenticationResult;
@@ -39,10 +35,12 @@
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
-
 public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<ConnectionStartOkBody>
 {
     private static final Logger _logger = Logger.getLogger(ConnectionStartOkMethodHandler.class);
@@ -56,13 +54,11 @@
         return _instance;
     }
 
-    private ConnectionStartOkMethodHandler()
-    {
-    }
+    private ConnectionStartOkMethodHandler() {}
 
-    public void methodReceived(AMQProtocolSession protocolSession,
-                               AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException
     {
+        AMQProtocolSession session = stateManager.getProtocolSession();
         final ConnectionStartOkBody body = evt.getMethod();
         _logger.info("SASL Mechanism selected: " + body.mechanism);
         _logger.info("Locale selected: " + body.locale);
@@ -72,18 +68,17 @@
         SaslServer ss = null;
         try
         {
-            ss = authMgr.createSaslServer(body.mechanism, protocolSession.getLocalFQDN());
-            protocolSession.setSaslServer(ss);
+            ss = authMgr.createSaslServer(String.valueOf(body.mechanism), session.getLocalFQDN());
+            session.setSaslServer(ss);
 
             AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
 
             //save clientProperties
-            if (protocolSession.getClientProperties() == null)
+            if (session.getClientProperties() == null)
             {
-                protocolSession.setClientProperties(body.clientProperties);
+                session.setClientProperties(body.clientProperties);
             }
 
-            AMQStateManager stateManager = protocolSession.getStateManager();
             switch (authResult.status)
             {
                 case ERROR:
@@ -94,26 +89,26 @@
                     stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
                     // Be aware of possible changes to parameter order as versions change.
                     AMQMethodBody tune = ConnectionTuneBody.createMethodBody(
-                        protocolSession.getMajor(), // AMQP major version
-                        protocolSession.getMinor(), // AMQP minor version
+                        session.getProtocolMajorVersion(), // AMQP major version
+                        session.getProtocolMinorVersion(), // AMQP minor version
                         Integer.MAX_VALUE,	// channelMax
                         getConfiguredFrameSize(),	// frameMax
                         HeartbeatConfig.getInstance().getDelay());	// heartbeat
-                    protocolSession.writeRequest(evt.getChannelId(), tune, stateManager);
+                    session.writeRequest(evt.getChannelId(), tune, stateManager);
                     break;
                 case CONTINUE:
                     stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
                     // Be aware of possible changes to parameter order as versions change.
                     AMQMethodBody challenge = ConnectionSecureBody.createMethodBody(
-                        protocolSession.getMajor(), // AMQP major version
-                        protocolSession.getMinor(), // AMQP minor version
+                        session.getProtocolMajorVersion(), // AMQP major version
+                        session.getProtocolMinorVersion(), // AMQP minor version
                         authResult.challenge);	// challenge
-                    protocolSession.writeRequest(evt.getChannelId(), challenge, stateManager);
+                    session.writeRequest(evt.getChannelId(), challenge, stateManager);
             }
         }
         catch (SaslException e)
         {
-            disposeSaslServer(protocolSession);
+            disposeSaslServer(session);
             throw new AMQException("SASL error: " + e, e);
         }
     }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -20,17 +20,16 @@
  */
 package org.apache.qpid.server.handler;
 
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ConnectionTuneOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
+import org.apache.log4j.Logger;
+
 public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<ConnectionTuneOkBody>
 {
     private static final Logger _logger = Logger.getLogger(ConnectionTuneOkMethodHandler.class);
@@ -41,17 +40,19 @@
     {
         return _instance;
     }
+    
+    private ConnectionTuneOkMethodHandler() {}
 
-    public void methodReceived(AMQProtocolSession protocolSession,
-                               AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException
     {
-        ConnectionTuneOkBody body = evt.getMethod();
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        final ConnectionTuneOkBody body = evt.getMethod();
         if (_logger.isDebugEnabled())
         {
             _logger.debug(body);
         }
-        protocolSession.getStateManager().changeState(AMQState.CONNECTION_NOT_OPENED);
-        protocolSession.initHeartbeats(body.heartbeat);
-        protocolSession.setFrameMax(body.getFrameMax());
+        stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
+        session.initHeartbeats(body.heartbeat);
+        session.setFrameMax(body.getFrameMax());
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Wed Feb 14 12:02:03 2007
@@ -1,24 +1,28 @@
 /*
  *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  *
  */
 package org.apache.qpid.server.handler;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ExchangeBoundBody;
 import org.apache.qpid.framing.ExchangeBoundOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -29,13 +33,13 @@
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+//import org.apache.log4j.Logger;
 
-/**
- * @author Apache Software Foundation
- */
 public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBoundBody>
 {
-    private static final ExchangeBoundHandler _instance = new ExchangeBoundHandler();
+    //private static final Logger _logger = Logger.getLogger(ExchangeBoundHandler.class);
 
     public static final int OK = 0;
 
@@ -51,39 +55,42 @@
 
     public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6;
 
+    private static final ExchangeBoundHandler _instance = new ExchangeBoundHandler();
+
     public static ExchangeBoundHandler getInstance()
     {
         return _instance;
     }
 
-    private ExchangeBoundHandler()
-    {
-    }
+    private ExchangeBoundHandler() {}
 
-    public void methodReceived(AMQProtocolSession protocolSession,
-                               AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
     {
-        byte major = protocolSession.getMajor();
-        byte minor = protocolSession.getMinor();
-        
-        ExchangeBoundBody body = evt.getMethod();
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        final ExchangeBoundBody body = evt.getMethod();
+        VirtualHost virtualHost = session.getVirtualHost();
+        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
 
-        String exchangeName = body.exchange;
-        String queueName = body.queue;
-        String routingKey = body.routingKey;
+        byte major = session.getProtocolMajorVersion();
+        byte minor = session.getProtocolMinorVersion();
+        
+        AMQShortString exchangeName = body.exchange;
+        AMQShortString queueName = body.queue;
+        AMQShortString routingKey = body.routingKey;
         if (exchangeName == null)
         {
             throw new AMQException("Exchange exchange must not be null");
         }
-        Exchange exchange = protocolSession.getExchangeRegistry().getExchange(exchangeName);
+        Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
         AMQMethodBody response;
+
         if (exchange == null)
         {
             // AMQP version change:  Be aware of possible changes to parameter order as versions change.
             response = ExchangeBoundOkBody.createMethodBody
                 (major, minor,	// AMQP version (major, minor)
                  EXCHANGE_NOT_FOUND,	// replyCode
-                 "Exchange " + exchangeName + " not found");	// replyText
+                 new AMQShortString("Exchange " + exchangeName + " not found"));	// replyText
         }
         else if (routingKey == null)
         {
@@ -108,14 +115,14 @@
             }
             else
             {
-                AMQQueue queue = protocolSession.getQueueRegistry().getQueue(queueName);
+                AMQQueue queue = queueRegistry.getQueue(queueName);
                 if (queue == null)
                 {
                     // AMQP version change:  Be aware of possible changes to parameter order as versions change.
                     response = ExchangeBoundOkBody.createMethodBody
                         (major, minor,	// AMQP version (major, minor)
                          QUEUE_NOT_FOUND,	// replyCode
-                         "Queue " + queueName + " not found");	// replyText
+                         new AMQShortString("Queue " + queueName + " not found"));	// replyText
                 }
                 else
                 {
@@ -133,21 +140,22 @@
                         response = ExchangeBoundOkBody.createMethodBody
                             (major, minor,	// AMQP version (major, minor)
                              QUEUE_NOT_BOUND,	// replyCode
-                             "Queue " + queueName + " not bound to exchange " + exchangeName);	// replyText
+                             new AMQShortString("Queue " + queueName + " not bound to exchange " +
+                                exchangeName));	// replyText
                     }
                 }
             }
         }
         else if (queueName != null)
         {
-            AMQQueue queue = protocolSession.getQueueRegistry().getQueue(queueName);
+            AMQQueue queue = queueRegistry.getQueue(queueName);
             if (queue == null)
             {
                 // AMQP version change:  Be aware of possible changes to parameter order as versions change.
                 response = ExchangeBoundOkBody.createMethodBody
                     (major, minor,	// AMQP version (major, minor)
                      QUEUE_NOT_FOUND,	// replyCode
-                     "Queue " + queueName + " not found");	// replyText
+                     new AMQShortString("Queue " + queueName + " not found"));	// replyText
             }
             else
             {
@@ -165,8 +173,8 @@
                     response = ExchangeBoundOkBody.createMethodBody
                         (major, minor,	// AMQP version (major, minor)
                          SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,	// replyCode
-                         "Queue " + queueName + " not bound with routing key " +
-                         body.routingKey + " to exchange " + exchangeName);	// replyText
+                         new AMQShortString("Queue " + queueName + " not bound with routing key " +
+                         body.routingKey + " to exchange " + exchangeName));	// replyText
                 }
             }
         }
@@ -186,10 +194,10 @@
                 response = ExchangeBoundOkBody.createMethodBody
                     (major, minor,	// AMQP version (major, minor)
                      NO_QUEUE_BOUND_WITH_RK,	// replyCode
-                     "No queue bound with routing key " + body.routingKey +
-                     " to exchange " + exchangeName);	// replyText
+                     new AMQShortString("No queue bound with routing key " + body.routingKey +
+                     " to exchange " + exchangeName));	// replyText
             }
         }
-        protocolSession.writeResponse(evt, response);
+        session.writeResponse(evt, response);
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Wed Feb 14 12:02:03 2007
@@ -20,9 +20,8 @@
  */
 package org.apache.qpid.server.handler;
 
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUnknownExchangeType;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.ExchangeDeclareBody;
@@ -30,13 +29,14 @@
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import org.apache.log4j.Logger;
 
 public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody>
 {
@@ -49,26 +49,26 @@
         return _instance;
     }
 
-    private final ExchangeFactory exchangeFactory;
-
-    private ExchangeDeclareHandler()
-    {
-        exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory();
-    }
+    private ExchangeDeclareHandler() {}
 
-    public void methodReceived(AMQProtocolSession protocolSession,
-                               AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException
     {
+        AMQProtocolSession session = stateManager.getProtocolSession();
         final ExchangeDeclareBody body = evt.getMethod();
+        VirtualHost virtualHost = session.getVirtualHost();
+        ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+        ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+        
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Request to declare exchange of type " + body.type + " with name " + body.exchange);
         }
-        ExchangeRegistry exchangeRegistry = protocolSession.getExchangeRegistry();
         synchronized(exchangeRegistry)
         {
             Exchange exchange = exchangeRegistry.getExchange(body.exchange);
 
+
+
             if (exchange == null)
             {
                 if(body.passive && ((body.type == null) || body.type.length() ==0))
@@ -85,23 +85,24 @@
                     }
                     catch(AMQUnknownExchangeType e)
                     {
-                        throw body.getConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange type: " + body.type,e);
+                        throw body.getConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange type: " + body.type, e);
                     }
                 }
             }
             else if (!exchange.getType().equals(body.type))
             {
 
-                throw new AMQConnectionException(AMQConstant.NOT_ALLOWED.getCode(), "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());    
+                throw new AMQConnectionException(AMQConstant.NOT_ALLOWED.getCode(), "Attempt to redeclare exchange: " + body.exchange +
+                    " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());        
             }
         }
         if(!body.nowait)
         {
             // Be aware of possible changes to parameter order as versions change.
             AMQMethodBody response = ExchangeDeclareOkBody.createMethodBody(
-                protocolSession.getMajor(), // AMQP major version
-                protocolSession.getMinor()); // AMQP minor version
-            protocolSession.writeResponse(evt, response);
+                session.getProtocolMajorVersion(), // AMQP major version
+                session.getProtocolMinorVersion()); // AMQP minor version
+            session.writeResponse(evt, response);
         }
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java Wed Feb 14 12:02:03 2007
@@ -21,19 +21,22 @@
 package org.apache.qpid.server.handler;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.ExchangeDeleteBody;
 import org.apache.qpid.framing.ExchangeDeleteOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.exchange.ExchangeInUseException;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+//import org.apache.log4j.Logger;
 
 public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeDeleteBody>
 {
+    //private static final Logger _logger = Logger.getLogger(ExchangeDeleteHandler.class);
+
     private static final ExchangeDeleteHandler _instance = new ExchangeDeleteHandler();
 
     public static ExchangeDeleteHandler getInstance()
@@ -41,21 +44,22 @@
         return _instance;
     }
 
-    private ExchangeDeleteHandler()
-    {
-    }
+    private ExchangeDeleteHandler() {}
 
-    public void methodReceived(AMQProtocolSession protocolSession,
-                               AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException
     {
-        ExchangeDeleteBody body = evt.getMethod();
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        final ExchangeDeleteBody body = evt.getMethod();
+        VirtualHost virtualHost = session.getVirtualHost();
+        ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+
         try
         {
-            protocolSession.getExchangeRegistry().unregisterExchange(body.exchange, body.ifUnused);
+            exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused);
             // Be aware of possible changes to parameter order as versions change.
-            protocolSession.writeResponse(evt, ExchangeDeleteOkBody.createMethodBody(
-                protocolSession.getMajor(), // AMQP major version
-                protocolSession.getMinor())); // AMQP minor version
+            session.writeResponse(evt, ExchangeDeleteOkBody.createMethodBody(
+                session.getProtocolMajorVersion(), // AMQP major version
+                session.getProtocolMinorVersion())); // AMQP minor version
         }
         catch (ExchangeInUseException e)
         {

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java Wed Feb 14 12:02:03 2007
@@ -22,15 +22,19 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
+//import org.apache.log4j.Logger;
+
 public class MessageAppendHandler implements StateAwareMethodListener<MessageAppendBody>
 {
+    //private static final Logger _logger = Logger.getLogger(MessageAppendHandler.class);
+
     private static MessageAppendHandler _instance = new MessageAppendHandler();
 
     public static MessageAppendHandler getInstance()
@@ -40,12 +44,14 @@
 
     private MessageAppendHandler() {}
     
-    
-    public void methodReceived (AMQProtocolSession protocolSession,
-                               	AMQMethodEvent<MessageAppendBody> evt)
-                                throws AMQException
+    public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageAppendBody> evt) throws AMQException
     {
-		// TODO
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        AMQChannel channel = session.getChannel(evt.getChannelId());
+        channel.addMessageAppend(evt.getMethod());
+        session.writeResponse(evt, MessageOkBody.createMethodBody(
+            session.getProtocolMajorVersion(), // AMQP major version
+            session.getProtocolMinorVersion())); // AMQP minor version
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java Wed Feb 14 12:02:03 2007
@@ -26,14 +26,16 @@
 import org.apache.qpid.framing.MessageOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
+//import org.apache.log4j.Logger;
+
 public class MessageCancelHandler implements StateAwareMethodListener<MessageCancelBody>
 {
+    //private static final Logger _logger = Logger.getLogger(MessageCancelHandler.class);
+
     private static MessageCancelHandler _instance = new MessageCancelHandler();
 
     public static MessageCancelHandler getInstance()
@@ -42,21 +44,19 @@
     }
 
     private MessageCancelHandler() {}
-    
-    
-    public void methodReceived (AMQProtocolSession protocolSession,
-                               	AMQMethodEvent<MessageCancelBody> evt)
-                                throws AMQException
+     
+    public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageCancelBody> evt) throws AMQException
     {
-        final AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+        AMQProtocolSession session = stateManager.getProtocolSession();
         final MessageCancelBody body = evt.getMethod();
-        channel.unsubscribeConsumer(protocolSession, body.destination);
+        final AMQChannel channel = session.getChannel(evt.getChannelId());
+        channel.unsubscribeConsumer(session, body.destination);
         
         // Be aware of possible changes to parameter order as versions change.
         final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
-            protocolSession.getMajor(), // AMQP major version
-            protocolSession.getMinor()); // AMQP minor version
-        protocolSession.writeResponse(evt, methodBody);
+            session.getProtocolMajorVersion(), // AMQP major version
+            session.getProtocolMinorVersion()); // AMQP minor version
+        session.writeResponse(evt, methodBody);
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java Wed Feb 14 12:02:03 2007
@@ -23,14 +23,16 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.MessageCheckpointBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
+//import org.apache.log4j.Logger;
+
 public class MessageCheckpointHandler implements StateAwareMethodListener<MessageCheckpointBody>
 {
+    //private static final Logger _logger = Logger.getLogger(MessageCheckpointHandler.class);
+
     private static MessageCheckpointHandler _instance = new MessageCheckpointHandler();
 
     public static MessageCheckpointHandler getInstance()
@@ -40,10 +42,7 @@
 
     private MessageCheckpointHandler() {}
     
-    
-    public void methodReceived (AMQProtocolSession protocolSession,
-                               	AMQMethodEvent<MessageCheckpointBody> evt)
-                                throws AMQException
+    public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageCheckpointBody> evt) throws AMQException
     {
 		// TODO
     }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java Wed Feb 14 12:02:03 2007
@@ -22,15 +22,19 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
+//import org.apache.log4j.Logger;
+
 public class MessageCloseHandler implements StateAwareMethodListener<MessageCloseBody>
 {
+    //private static final Logger _logger = Logger.getLogger(MessageCloseHandler.class);
+
     private static MessageCloseHandler _instance = new MessageCloseHandler();
 
     public static MessageCloseHandler getInstance()
@@ -39,13 +43,15 @@
     }
 
     private MessageCloseHandler() {}
-    
-    
-    public void methodReceived (AMQProtocolSession protocolSession,
-                               	AMQMethodEvent<MessageCloseBody> evt)
-                                throws AMQException
+       
+    public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageCloseBody> evt) throws AMQException
     {
-		// TODO
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        AMQChannel channel = session.getChannel(evt.getChannelId());
+        channel.addMessageClose(evt.getMethod());
+        session.writeResponse(evt, MessageOkBody.createMethodBody(
+            session.getProtocolMajorVersion(), // AMQP major version
+            session.getProtocolMinorVersion())); // AMQP minor version
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java Wed Feb 14 12:02:03 2007
@@ -20,9 +20,9 @@
  */
 package org.apache.qpid.server.handler;
 
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.MessageConsumeBody;
 import org.apache.qpid.framing.MessageOkBody;
 import org.apache.qpid.protocol.AMQConstant;
@@ -32,7 +32,11 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import org.apache.log4j.Logger;
 
 public class MessageConsumeHandler implements StateAwareMethodListener<MessageConsumeBody>
 {
@@ -47,13 +51,13 @@
 
     private MessageConsumeHandler() {}
 
-
-    public void methodReceived (AMQProtocolSession session,
-                               	AMQMethodEvent<MessageConsumeBody> evt)
-                                throws AMQException
+    public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
     {
-        MessageConsumeBody body = evt.getMethod();
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        final MessageConsumeBody body = evt.getMethod();
         final int channelId = evt.getChannelId();
+        VirtualHost virtualHost = session.getVirtualHost();
+        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
 
         AMQChannel channel = session.getChannel(channelId);
         if (channel == null)
@@ -63,32 +67,33 @@
         }
         else
         {
-            AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : session.getQueueRegistry().getQueue(body.queue);
+            AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
 
             if (queue == null)
             {
                 _log.info("No queue for '" + body.queue + "'");
-                if(body.queue!=null)
+                if(body.queue != null)
                 {
                     session.closeChannelRequest(evt.getChannelId(), AMQConstant.NOT_FOUND.getCode(),
-                        "No such queue, '" + body.queue + "'");
+                        new AMQShortString("No such queue, '" + body.queue + "'"));
                 }
                 else
                 {
                     session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
-                        "No queue name provided, no default queue defined.", body.getClazz(), body.getMethod());
+                        new AMQShortString("No queue name provided, no default queue defined."),
+                        body.getClazz(), body.getMethod());
                 }
             }
             else
             {
                 try
                 {
-                    /*AMQShort*/String destination = channel.subscribeToQueue
+                    AMQShortString destination = channel.subscribeToQueue
                         (body.destination, queue, session, !body.noAck, /*XXX*/null, body.noLocal, body.exclusive);
                     // Be aware of possible changes to parameter order as versions change.
                     session.writeResponse(evt, MessageOkBody.createMethodBody(
-                        session.getMajor(), // AMQP major version
-                        session.getMinor())); // AMQP minor version
+                        session.getProtocolMajorVersion(), // AMQP major version
+                        session.getProtocolMinorVersion())); // AMQP minor version
 
                     //now allow queue to start async processing of any backlog of messages
                     queue.deliverAsync();
@@ -97,13 +102,14 @@
                 {
                     _log.info("Closing connection due to invalid selector: " + ise.getMessage());
                     session.closeChannelRequest(evt.getChannelId(), AMQConstant.INVALID_SELECTOR.getCode(),
-                        ise.getMessage());
+                        new AMQShortString(ise.getMessage()));
                 }
                 catch (ConsumerTagNotUniqueException e)
                 {
                     _log.info("Closing connection due to duplicate (non-unique) consumer tag: " + e.getMessage());
                     session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
-                        "Non-unique consumer tag, '" + body.destination + "'", body.getClazz(), body.getMethod());
+                        new AMQShortString("Non-unique consumer tag, '" + body.destination + "'"),
+                        body.getClazz(), body.getMethod());
                 }
                 catch (AMQQueue.ExistingExclusiveSubscription e)
                 {