You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/15 15:20:39 UTC

svn commit: r496326 - in /incubator/qpid/branches/qpid.0-9/java: broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpid/server/state/ client/src/main/java/org/apache/qpid/client/failover/ client/src/main/java/org/apa...

Author: kpvdr
Date: Mon Jan 15 06:20:37 2007
New Revision: 496326

URL: http://svn.apache.org/viewvc?view=rev&rev=496326
Log:
Merged the refactor to a common AMQMethodListener class on trunk, plus the race condition fix of Robert Godfrey. This opens the way for Request and Response managers to use a common event dispatch for both client and server.

Added:
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
      - copied, changed from r496305, incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java
Removed:
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java
Modified:
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Jan 15 06:20:37 2007
@@ -42,6 +42,7 @@
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.codec.AMQDecoder;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
 
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
@@ -111,7 +112,16 @@
                                   AMQCodecFactory codecFactory)
             throws AMQException
     {
-        this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
+        _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this);
+        _minaProtocolSession = session;
+        session.setAttachment(this);
+        _frameListeners.add(_stateManager);
+        _queueRegistry = queueRegistry;
+        _exchangeRegistry = exchangeRegistry;
+        _codecFactory = codecFactory;
+        _managedObject = createMBean();
+        _managedObject.register();
+//        this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
     }
 
     public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
@@ -263,7 +273,7 @@
 //             boolean wasAnyoneInterested = false;
 //             for (AMQMethodListener listener : _frameListeners)
 //             {
-//                 wasAnyoneInterested = listener.methodReceived(evt, this, _queueRegistry, _exchangeRegistry) ||
+//                 wasAnyoneInterested = listener.methodReceived(evt) ||
 //                                       wasAnyoneInterested;
 //             }
 //             if (!wasAnyoneInterested)
@@ -276,7 +286,7 @@
 //             _logger.error("Closing channel due to: " + e.getMessage());
 //             writeFrame(e.getCloseFrame(frame.channel));
 //         }
-//         catch (AMQException e)
+//         catch (Exception e)
 //         {
 //             for (AMQMethodListener listener : _frameListeners)
 //             {

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Mon Jan 15 06:20:37 2007
@@ -25,7 +25,7 @@
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.handler.*;
-import org.apache.qpid.server.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.log4j.Logger;
@@ -44,6 +44,9 @@
 {
     private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
 
+    private final QueueRegistry _queueRegistry;
+    private final ExchangeRegistry _exchangeRegistry;
+    private final AMQProtocolSession _protocolSession;
     /**
      * The current state
      */
@@ -58,13 +61,16 @@
 
     private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
 
-    public AMQStateManager()
+    public AMQStateManager(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
     {
-        this(AMQState.CONNECTION_NOT_STARTED, true);
+        this(AMQState.CONNECTION_NOT_STARTED, true, queueRegistry, exchangeRegistry, protocolSession);
     }
 
-    protected AMQStateManager(AMQState initial, boolean register)
+    protected AMQStateManager(AMQState initial, boolean register, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
     {
+        _queueRegistry = queueRegistry;
+        _exchangeRegistry = exchangeRegistry;
+        _protocolSession = protocolSession;
         _currentState = initial;
         if (register)
         {
@@ -158,7 +164,7 @@
         }
     }
 
-    public void error(AMQException e)
+    public void error(Exception e)
     {
         _logger.error("State manager received error notification: " + e, e);
         for (StateListener l : _stateListeners)
@@ -167,15 +173,12 @@
         }
     }
 
-    public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt,
-                           AMQProtocolSession protocolSession,
-                           QueueRegistry queueRegistry,
-                           ExchangeRegistry exchangeRegistry) throws AMQException
+    public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
     {
         StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod());
         if (handler != null)
         {
-            handler.methodReceived(this, queueRegistry, exchangeRegistry, protocolSession, evt);
+            handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt);
             return true;
         }
         return false;

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Mon Jan 15 06:20:37 2007
@@ -89,7 +89,7 @@
             // have a state waiter waiting until the connection is closed for some reason. Or in future we may have
             // a slightly more complex state model therefore I felt it was worthwhile doing this.
             AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
-            _amqProtocolHandler.setStateManager(new AMQStateManager());
+            _amqProtocolHandler.setStateManager(new AMQStateManager(_amqProtocolHandler.getProtocolSession()));
             if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
             {
                 _amqProtocolHandler.setStateManager(existingStateManager);

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Jan 15 06:20:37 2007
@@ -46,6 +46,7 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.HeartbeatBody;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.ssl.BogusSSLContextFactory;
 
@@ -148,7 +149,7 @@
             session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
         }
 
-        _protocolSession = new AMQProtocolSession(this, session, _connection);
+        _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
         _protocolSession.init();
     }
 
@@ -284,7 +285,7 @@
      */
     public void propagateExceptionToWaiters(Exception e)
     {
-        _stateManager.error(e);
+        getStateManager().error(e);
         final Iterator it = _frameListeners.iterator();
         while (it.hasNext())
         {
@@ -321,11 +322,11 @@
                 while (it.hasNext())
                 {
                     final AMQMethodListener listener = (AMQMethodListener) it.next();
-                    wasAnyoneInterested = listener.methodReceived(evt, _protocolSession) || wasAnyoneInterested;
+                    wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
                 }
                 if (!wasAnyoneInterested)
                 {
-                    throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
+                    throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:"  + _frameListeners);
                 }
             }
             catch (AMQException e)
@@ -383,7 +384,7 @@
 
     public void attainState(AMQState s) throws AMQException
     {
-        _stateManager.attainState(s);
+        getStateManager().attainState(s);
     }
 
     /**
@@ -471,7 +472,7 @@
 
     public void closeConnection() throws AMQException
     {
-        _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+        getStateManager().changeState(AMQState.CONNECTION_CLOSING);
 
         // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -547,6 +548,12 @@
     public void setStateManager(AMQStateManager stateManager)
     {
         _stateManager = stateManager;
+        _protocolSession.setStateManager(stateManager);
+    }
+    
+    public AMQProtocolSession getProtocolSession()
+    {
+        return _protocolSession;
     }
 
     FailoverState getFailoverState()

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Mon Jan 15 06:20:37 2007
@@ -31,6 +31,7 @@
 import org.apache.qpid.client.ConnectionTuneParameters;
 import org.apache.qpid.client.message.UnexpectedBodyReceivedException;
 import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
@@ -66,6 +67,8 @@
     protected static final String SASL_CLIENT = "SASLClient";
 
     protected final IoSession _minaProtocolSession;
+ 
+    private AMQStateManager _stateManager;
 
     protected WriteFuture _lastWriteFuture;
 
@@ -102,6 +105,7 @@
     {
         _protocolHandler = null;
         _minaProtocolSession = null;
+        _stateManager = new AMQStateManager(this);
     }
 
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
@@ -110,6 +114,19 @@
         _minaProtocolSession = protocolSession;
         // properties of the connection are made available to the event handlers
         _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
+        _stateManager = new AMQStateManager(this);
+    }
+ 
+    public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager)
+    {
+        _protocolHandler = protocolHandler;
+        _minaProtocolSession = protocolSession;
+        // properties of the connection are made available to the event handlers
+        _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
+
+        _stateManager = stateManager;
+        _stateManager.setProtocolSession(this);
+                
     }
 
     public void init()
@@ -139,6 +156,16 @@
     public void setClientID(String clientID) throws JMSException
     {
         getAMQConnection().setClientID(clientID);
+    }
+    
+    public AMQStateManager getStateManager()
+    {
+        return _stateManager;
+    }
+    
+    public void setStateManager(AMQStateManager stateManager)
+    {
+        _stateManager = stateManager;
     }
 
     public String getVirtualHost()

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Mon Jan 15 06:20:37 2007
@@ -23,6 +23,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 
 public abstract class BlockingMethodFrameListener implements AMQMethodListener
@@ -55,7 +56,7 @@
      * @return true if the listener has dealt with this frame
      * @throws AMQException
      */
-    public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException
+    public boolean methodReceived(AMQMethodEvent evt) throws AMQException
     {
         AMQMethodBody method = evt.getMethod();
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Mon Jan 15 06:20:37 2007
@@ -22,8 +22,8 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.client.handler.*;
-import org.apache.qpid.client.protocol.AMQMethodListener;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.framing.*;
 import org.apache.log4j.Logger;
@@ -41,6 +41,7 @@
 public class AMQStateManager implements AMQMethodListener
 {
     private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
+    private AMQProtocolSession _protocolSession;
 
     /**
      * The current state
@@ -54,14 +55,20 @@
     private final Map _state2HandlersMap = new HashMap();
 
     private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
-
+ 
     public AMQStateManager()
     {
-        this(AMQState.CONNECTION_NOT_STARTED, true);
+        this(null);
     }
 
-    protected AMQStateManager(AMQState state, boolean register)
+    public AMQStateManager(AMQProtocolSession protocolSession)
     {
+        this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession);
+    }
+
+    protected AMQStateManager(AMQState state, boolean register, AMQProtocolSession protocolSession)
+    {
+        _protocolSession = protocolSession;
         _currentState = state;
         if(register)
         {
@@ -154,12 +161,12 @@
         }
     }
 
-    public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException
+    public boolean methodReceived(AMQMethodEvent evt) throws AMQException
     {
         StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
         if (handler != null)
         {
-            handler.methodReceived(this, protocolSession, evt);
+            handler.methodReceived(this, _protocolSession, evt);
             return true;
         }
         return false;
@@ -234,5 +241,15 @@
             sw.waituntilStateHasChanged();
         }
         // at this point the state will have changed.
+    }
+
+    public AMQProtocolSession getProtocolSession()
+    {
+        return _protocolSession;
+    }
+
+    public void setProtocolSession(AMQProtocolSession session)
+    {
+        _protocolSession = session;
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java Mon Jan 15 06:20:37 2007
@@ -52,7 +52,7 @@
     public void handle(int channel, AMQMethodBody method) throws AMQException
     {
         AMQMethodEvent evt = new AMQMethodEvent(channel, method);
-        _stateMgr.methodReceived(evt, _session);
+        _stateMgr.methodReceived(evt);
     }
 
     private class SessionAdapter extends AMQProtocolSession

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java Mon Jan 15 06:20:37 2007
@@ -29,6 +29,7 @@
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.IllegalStateTransitionException;
 import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.ConnectionCloseBody;
@@ -49,9 +50,9 @@
     private final Map<AMQState, ClientRegistry> _handlers = new HashMap<AMQState, ClientRegistry>();
     private final MemberHandle _identity;
 
-    protected ClientHandlerRegistry(MemberHandle local)
+    protected ClientHandlerRegistry(MemberHandle local, AMQProtocolSession protocolSession)
     {
-        super(AMQState.CONNECTION_NOT_STARTED, false);
+        super(AMQState.CONNECTION_NOT_STARTED, false, protocolSession);
 
         _identity = local;
 

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java Mon Jan 15 06:20:37 2007
@@ -46,7 +46,8 @@
 
     ServerHandlerRegistry getHandlerRegistry()
     {
-        return new ServerHandlerRegistry(getHandlerFactory());
+        // TODO - FIX THIS!
+        return new ServerHandlerRegistry(getHandlerFactory(), null, null, null);
     }
 
     private MethodHandlerFactory getHandlerFactory()

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java Mon Jan 15 06:20:37 2007
@@ -33,6 +33,7 @@
 import org.apache.qpid.framing.ClusterMembershipBody;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -73,9 +74,9 @@
         _handlers = handler._handlers;
     }
 
-    protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException
+    protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException
     {
-        new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers));
+        new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers, queues, exchanges, protocolSession));
     }
 
     void connect(String join) throws Exception

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java Mon Jan 15 06:20:37 2007
@@ -63,7 +63,8 @@
     {
         super(host, port);
         _local = local;
-        _legacyHandler = new ClientHandlerRegistry(local);
+        // TODO - FIX THIS
+        _legacyHandler = new ClientHandlerRegistry(local, null);
     }
 
     private void init(IoSession session)

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java?view=diff&rev=496326&r1=496325&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java Mon Jan 15 06:20:37 2007
@@ -27,6 +27,9 @@
 import org.apache.qpid.server.state.IllegalStateTransitionException;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -40,20 +43,23 @@
     private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class);
     private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>();
 
-    ServerHandlerRegistry()
+    ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
++        AMQProtocolSession protocolSession)
     {
-        super(AMQState.CONNECTION_NOT_STARTED, false);
+        super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, exchangeRegistry, protocolSession);
     }
 
-    ServerHandlerRegistry(ServerHandlerRegistry s)
+    ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry,
++        ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
     {
-        this();
+        this(queueRegistry, exchangeRegistry, protocolSession);
         _handlers.putAll(s._handlers);
     }
 
-    ServerHandlerRegistry(MethodHandlerFactory factory)
+    ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry queueRegistry,
++        ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
     {
-        this();
+        this(queueRegistry, exchangeRegistry, protocolSession);
         init(factory);
     }
 

Copied: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java (from r496305, incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java?view=diff&rev=496326&p1=incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java&r1=496305&p2=incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java&r2=496326
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java Mon Jan 15 06:20:37 2007
@@ -18,12 +18,8 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.protocol;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.framing.AMQMethodBody;
 
 /**
@@ -35,22 +31,18 @@
     /**
      * Invoked when a method frame has been received
      * @param evt the event that contains the method and channel
-     * @param protocolSession the protocol session associated with the event
      * @return true if the handler has processed the method frame, false otherwise. Note
      * that this does not prohibit the method event being delivered to subsequent listeners
      * but can be used to determine if nobody has dealt with an incoming method frame.
-     * @throws AMQException if an error has occurred. This exception will be delivered
+     * @throws Exception if an error has occurred. This exception will be delivered
      * to all registered listeners using the error() method (see below) allowing them to
      * perform cleanup if necessary.
      */
-    <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt,
-                           AMQProtocolSession protocolSession,
-                           QueueRegistry queueRegistry,
-                           ExchangeRegistry exchangeRegistry) throws AMQException;
+    <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws Exception;
 
     /**
      * Callback when an error has occurred. Allows listeners to clean up.
      * @param e
      */
-    void error(AMQException e);
+    void error(Exception e);
 }