You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/12 23:02:13 UTC

svn commit: r495754 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpid/server/state/ client/src/main/java/org/apache/qpid/client/failover/ client/src/main/java/org/apache/qpi...

Author: kpvdr
Date: Fri Jan 12 14:02:11 2007
New Revision: 495754

URL: http://svn.apache.org/viewvc?view=rev&rev=495754
Log:
Created common AMQMethodListener class, allowing the Request and Response managers to use a common interface to dispatch events to both the client and servers. Refactoring of bothe the client and broker AMQStateManagers and AMQProtocolSession classes was performed. The refactoring has run aground in the clustering, however, and this still needs to be resolved. As the cluster tests are currently disabled (by whom, I'm not sure), this does not disrupt the overall test result. JIRAs will be opened for this issue.

Added:
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
      - copied, changed from r495583, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java
Removed:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Fri Jan 12 14:02:11 2007
@@ -29,6 +29,7 @@
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.codec.AMQDecoder;
 
@@ -99,10 +100,19 @@
                                   AMQCodecFactory codecFactory)
             throws AMQException
     {
-        this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
+        _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this);
+        _minaProtocolSession = session;
+        session.setAttachment(this);
+        
+        _queueRegistry = queueRegistry;
+        _exchangeRegistry = exchangeRegistry;
+        _codecFactory = codecFactory;
+        _managedObject = createMBean();
+        _managedObject.register();
+//        this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
     }
 
-    public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
+     public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
                                   AMQCodecFactory codecFactory, AMQStateManager stateManager)
             throws AMQException
     {
@@ -208,13 +218,13 @@
                                                                                     (AMQMethodBody) frame.bodyFrame);
         try
         {
-            boolean wasAnyoneInterested = _stateManager.methodReceived(evt, this, _queueRegistry, _exchangeRegistry);
+            boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
 
             if(!_frameListeners.isEmpty())
             {
                 for (AMQMethodListener listener : _frameListeners)
                 {
-                    wasAnyoneInterested = listener.methodReceived(evt, this, _queueRegistry, _exchangeRegistry) ||
+                    wasAnyoneInterested = listener.methodReceived(evt) ||
                                           wasAnyoneInterested;
                 }
             }
@@ -233,7 +243,7 @@
             _logger.error("Closing connection due to: " + e.getMessage());
             writeFrame(e.getCloseFrame(frame.channel));
         }        
-        catch (AMQException e)
+        catch (Exception e)
         {
             _stateManager.error(e);
             for (AMQMethodListener listener : _frameListeners)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Fri Jan 12 14:02:11 2007
@@ -25,7 +25,7 @@
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.handler.*;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.log4j.Logger;
@@ -43,7 +43,9 @@
 public class AMQStateManager implements AMQMethodListener
 {
     private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
-
+    private final QueueRegistry _queueRegistry;
+    private final ExchangeRegistry _exchangeRegistry;
+    private final AMQProtocolSession _protocolSession;
     /**
      * The current state
      */
@@ -58,13 +60,16 @@
 
     private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
 
-    public AMQStateManager()
+    public AMQStateManager(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
     {
-        this(AMQState.CONNECTION_NOT_STARTED, true);
+        this(AMQState.CONNECTION_NOT_STARTED, true, queueRegistry, exchangeRegistry, protocolSession);
     }
 
-    protected AMQStateManager(AMQState initial, boolean register)
+    protected AMQStateManager(AMQState initial, boolean register, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
     {
+        _queueRegistry = queueRegistry;
+        _exchangeRegistry = exchangeRegistry;
+        _protocolSession = protocolSession;
         _currentState = initial;
         if (register)
         {
@@ -149,7 +154,7 @@
         }
     }
 
-    public void error(AMQException e)
+    public void error(Exception e)
     {
         _logger.error("State manager received error notification: " + e, e);
         for (StateListener l : _stateListeners)
@@ -158,15 +163,12 @@
         }
     }
 
-    public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt,
-                           AMQProtocolSession protocolSession,
-                           QueueRegistry queueRegistry,
-                           ExchangeRegistry exchangeRegistry) throws AMQException
+    public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
     {
         StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod());
         if (handler != null)
         {
-            handler.methodReceived(this, queueRegistry, exchangeRegistry, protocolSession, evt);
+            handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt);
             return true;
         }
         return false;

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Fri Jan 12 14:02:11 2007
@@ -89,7 +89,7 @@
             // have a state waiter waiting until the connection is closed for some reason. Or in future we may have
             // a slightly more complex state model therefore I felt it was worthwhile doing this.
             AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
-            _amqProtocolHandler.setStateManager(new AMQStateManager());
+            _amqProtocolHandler.setStateManager(new AMQStateManager(_amqProtocolHandler.getProtocolSession()));
             if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
             {
                 _amqProtocolHandler.setStateManager(existingStateManager);

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Jan 12 14:02:11 2007
@@ -40,6 +40,7 @@
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.ssl.BogusSSLContextFactory;
 
 import java.util.Iterator;
@@ -68,7 +69,7 @@
      */
     private volatile AMQProtocolSession _protocolSession;
 
-    private AMQStateManager _stateManager = new AMQStateManager();
+//    private AMQStateManager _stateManager = new AMQStateManager();
 
     private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
 
@@ -277,7 +278,7 @@
      */
     public void propagateExceptionToWaiters(Exception e)
     {
-        _stateManager.error(e);
+        _protocolSession.getStateManager().error(e);
         if(!_frameListeners.isEmpty())
         {
             final Iterator it = _frameListeners.iterator();
@@ -316,14 +317,14 @@
             try
             {
 
-                boolean wasAnyoneInterested = _stateManager.methodReceived(evt, _protocolSession);
+                boolean wasAnyoneInterested = _protocolSession.getStateManager().methodReceived(evt);
                 if(!_frameListeners.isEmpty())
                 {
                     Iterator it = _frameListeners.iterator();
                     while (it.hasNext())
                     {
                         final AMQMethodListener listener = (AMQMethodListener) it.next();
-                        wasAnyoneInterested = listener.methodReceived(evt, _protocolSession) || wasAnyoneInterested;
+                        wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
                     }
                 }
                 if (!wasAnyoneInterested)
@@ -333,7 +334,7 @@
             }
             catch (AMQException e)
             {
-                _stateManager.error(e);
+                _protocolSession.getStateManager().error(e);
                 if(!_frameListeners.isEmpty())
                 {
                     Iterator it = _frameListeners.iterator();
@@ -394,7 +395,7 @@
   */
     public void attainState(AMQState s) throws AMQException
     {
-        _stateManager.attainState(s);
+        _protocolSession.getStateManager().attainState(s);
     }
 
     /**
@@ -486,7 +487,7 @@
 
     public void closeConnection() throws AMQException
     {
-        _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+        _protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSING);
 
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -556,12 +557,17 @@
 
     public AMQStateManager getStateManager()
     {
-        return _stateManager;
+        return _protocolSession.getStateManager();
     }
 
     public void setStateManager(AMQStateManager stateManager)
     {
-        _stateManager = stateManager;
+        _protocolSession.setStateManager(stateManager);
+    }
+    
+    public AMQProtocolSession getProtocolSession()
+    {
+        return _protocolSession;
     }
 
     FailoverState getFailoverState()

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Jan 12 14:02:11 2007
@@ -33,6 +33,7 @@
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQProtocolWriter;
+import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.commons.lang.StringUtils;
 
 import javax.jms.JMSException;
@@ -63,6 +64,8 @@
 
     protected final IoSession _minaProtocolSession;
 
+    private AMQStateManager _stateManager;
+
     protected WriteFuture _lastWriteFuture;
 
     /**
@@ -98,6 +101,7 @@
     {
         _protocolHandler = null;
         _minaProtocolSession = null;
+        _stateManager = new AMQStateManager(this);
     }
 
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
@@ -106,6 +110,7 @@
         _minaProtocolSession = protocolSession;
         // properties of the connection are made available to the event handlers
         _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
+        _stateManager = new AMQStateManager(this);
     }
 
     public void init()
@@ -135,6 +140,16 @@
     public void setClientID(String clientID) throws JMSException
     {
         getAMQConnection().setClientID(clientID);
+    }
+    
+    public AMQStateManager getStateManager()
+    {
+        return _stateManager;
+    }
+    
+    public void setStateManager(AMQStateManager stateManager)
+    {
+        _stateManager = stateManager;
     }
 
     public String getVirtualHost()

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Fri Jan 12 14:02:11 2007
@@ -23,6 +23,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 
 public abstract class BlockingMethodFrameListener implements AMQMethodListener
@@ -55,7 +56,7 @@
      * @return true if the listener has dealt with this frame
      * @throws AMQException
      */
-    public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException
+    public boolean methodReceived(AMQMethodEvent evt) throws AMQException
     {
         AMQMethodBody method = evt.getMethod();
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Fri Jan 12 14:02:11 2007
@@ -23,8 +23,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.handler.*;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.protocol.AMQMethodListener;
 import org.apache.qpid.framing.*;
 import org.apache.log4j.Logger;
 
@@ -41,6 +41,7 @@
 public class AMQStateManager implements AMQMethodListener
 {
     private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
+    private final AMQProtocolSession _protocolSession;
 
     /**
      * The current state
@@ -55,13 +56,14 @@
 
     private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
 
-    public AMQStateManager()
+    public AMQStateManager(AMQProtocolSession protocolSession)
     {
-        this(AMQState.CONNECTION_NOT_STARTED, true);
+        this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession);
     }
 
-    protected AMQStateManager(AMQState state, boolean register)
+    protected AMQStateManager(AMQState state, boolean register, AMQProtocolSession protocolSession)
     {
+        _protocolSession = protocolSession;
         _currentState = state;
         if(register)
         {
@@ -147,12 +149,12 @@
         }
     }
 
-    public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException
+    public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
     {
         StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
         if (handler != null)
         {
-            handler.methodReceived(this, protocolSession, evt);
+            handler.methodReceived(this, _protocolSession, evt);
             return true;
         }
         return false;

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java Fri Jan 12 14:02:11 2007
@@ -52,7 +52,7 @@
     public void handle(int channel, AMQMethodBody method) throws AMQException
     {
         AMQMethodEvent evt = new AMQMethodEvent(channel, method);
-        _stateMgr.methodReceived(evt, _session);
+        _stateMgr.methodReceived(evt);
     }
 
     private class SessionAdapter extends AMQProtocolSession

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java Fri Jan 12 14:02:11 2007
@@ -29,6 +29,7 @@
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.IllegalStateTransitionException;
 import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.framing.*;
 
 import java.util.HashMap;
@@ -43,9 +44,9 @@
     private final Map<AMQState, ClientRegistry> _handlers = new HashMap<AMQState, ClientRegistry>();
     private final MemberHandle _identity;
 
-    protected ClientHandlerRegistry(MemberHandle local)
+    protected ClientHandlerRegistry(MemberHandle local, AMQProtocolSession protocolSession)
     {
-        super(AMQState.CONNECTION_NOT_STARTED, false);
+        super(AMQState.CONNECTION_NOT_STARTED, false, protocolSession);
 
         _identity = local;
 

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java Fri Jan 12 14:02:11 2007
@@ -46,7 +46,7 @@
 
     ServerHandlerRegistry getHandlerRegistry()
     {
-        return new ServerHandlerRegistry(getHandlerFactory());
+        return new ServerHandlerRegistry(getHandlerFactory(), null, null, null);
     }
 
     private MethodHandlerFactory getHandlerFactory()

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java Fri Jan 12 14:02:11 2007
@@ -33,6 +33,7 @@
 import org.apache.qpid.framing.ClusterMembershipBody;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -73,9 +74,9 @@
         _handlers = handler._handlers;
     }
 
-    protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException
+    protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException
     {
-        new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers));
+        new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers, queues, exchanges, protocolSession));
     }
 
     void connect(String join) throws Exception

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java Fri Jan 12 14:02:11 2007
@@ -37,10 +37,12 @@
 {
     private MemberHandle _peer;
 
-    public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager)
-            throws AMQException
+    public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException
+//    public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry,
+//        ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory) throws AMQException
     {
         super(session, queueRegistry, exchangeRegistry, codecFactory, stateManager);
+//        super(session, queueRegistry, exchangeRegistry, codecFactory);
     }
 
     public boolean isPeerSession()

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java Fri Jan 12 14:02:11 2007
@@ -63,7 +63,7 @@
     {
         super(host, port);
         _local = local;
-        _legacyHandler = new ClientHandlerRegistry(local);
+        _legacyHandler = new ClientHandlerRegistry(local, null);
     }
 
     private void init(IoSession session)

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java?view=diff&rev=495754&r1=495753&r2=495754
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java Fri Jan 12 14:02:11 2007
@@ -27,6 +27,9 @@
 import org.apache.qpid.server.state.IllegalStateTransitionException;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -40,20 +43,23 @@
     private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class);
     private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>();
 
-    ServerHandlerRegistry()
+    ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
+        AMQProtocolSession protocolSession)
     {
-        super(AMQState.CONNECTION_NOT_STARTED, false);
+        super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, exchangeRegistry, protocolSession);
     }
 
-    ServerHandlerRegistry(ServerHandlerRegistry s)
+    ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry,
+        ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
     {
-        this();
+        this(queueRegistry, exchangeRegistry, protocolSession);
         _handlers.putAll(s._handlers);
     }
 
-    ServerHandlerRegistry(MethodHandlerFactory factory)
+    ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry queueRegistry,
+        ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
     {
-        this();
+        this(queueRegistry, exchangeRegistry, protocolSession);
         init(factory);
     }
 

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java (from r495583, incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java?view=diff&rev=495754&p1=incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java&r1=495583&p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java&r2=495754
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java Fri Jan 12 14:02:11 2007
@@ -18,12 +18,8 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.protocol;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.framing.AMQMethodBody;
 
 /**
@@ -43,14 +39,11 @@
      * to all registered listeners using the error() method (see below) allowing them to
      * perform cleanup if necessary.
      */
-    <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt,
-                           AMQProtocolSession protocolSession,
-                           QueueRegistry queueRegistry,
-                           ExchangeRegistry exchangeRegistry) throws AMQException;
+    <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws Exception;
 
     /**
      * Callback when an error has occurred. Allows listeners to clean up.
      * @param e
      */
-    void error(AMQException e);
+    void error(Exception e);
 }