You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by gd...@apache.org on 2004/07/20 02:15:07 UTC

cvs commit: incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode AbstractRemoteNode.java RemoteNode.java RemoteNodeConnection.java RemoteNodeMonitor.java NodeServer.java RemoteNodeManagerImpl.java RemoteNodeManager.java MessagingTransportFactory.java

gdamour     2004/07/19 17:15:07

  Modified:    sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network
                        RemoteNodeJoiner.java NodeServerImpl.java
                        RemoteNodeJoined.java
                        AbstractRemoteNodeConnection.java
                        NetworkTransportFactory.java
                        RemoteNodeJoinerConnection.java
                        RemoteNodeJoinedConnection.java
               sandbox/messaging/src/test/org/apache/geronimo/messaging/remotenode
                        MockNodeServer.java RemoteNodeManagerImplTest.java
                        MockMessagingTransportFactory.java
                        MockRemoteNode.java
               sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode
                        RemoteNode.java RemoteNodeConnection.java
                        RemoteNodeMonitor.java NodeServer.java
                        RemoteNodeManagerImpl.java RemoteNodeManager.java
                        MessagingTransportFactory.java
  Added:       sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network
                        CallbackSocketProtocol.java
               sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode
                        AbstractRemoteNode.java
  Log:
  Refactoring of the remote node layer. This new version defines only one connection per RemoteNode. A
  RemoteNodeConnection supports now an asynchronous callback when the underlying connection is closed. It is
  used to detect and remove a RemoteNode from its RemoteNodeManager when the connection fails.
  
  Revision  Changes    Path
  1.4       +67 -16    incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/RemoteNodeJoiner.java
  
  Index: RemoteNodeJoiner.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/RemoteNodeJoiner.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- RemoteNodeJoiner.java	24 Jun 2004 23:39:03 -0000	1.3
  +++ RemoteNodeJoiner.java	20 Jul 2004 00:15:05 -0000	1.4
  @@ -17,14 +17,23 @@
   
   package org.apache.geronimo.messaging.remotenode.network;
   
  -import java.io.IOException;
  +import java.lang.reflect.InvocationTargetException;
   
  -import org.apache.geronimo.messaging.CommunicationException;
  +import org.apache.geronimo.messaging.Msg;
  +import org.apache.geronimo.messaging.MsgBody;
  +import org.apache.geronimo.messaging.MsgHeader;
  +import org.apache.geronimo.messaging.MsgHeaderConstants;
  +import org.apache.geronimo.messaging.NodeException;
   import org.apache.geronimo.messaging.NodeInfo;
  +import org.apache.geronimo.messaging.RequestSender;
  +import org.apache.geronimo.messaging.interceptors.MsgOutInterceptor;
   import org.apache.geronimo.messaging.io.IOContext;
  -import org.apache.geronimo.messaging.remotenode.MessagingTransportFactory;
  -import org.apache.geronimo.messaging.remotenode.RemoteNode;
  +import org.apache.geronimo.messaging.remotenode.AbstractRemoteNode;
   import org.apache.geronimo.messaging.remotenode.RemoteNodeConnection;
  +import org.apache.geronimo.network.SelectorManager;
  +
  +import EDU.oswego.cs.dl.util.concurrent.FutureResult;
  +import EDU.oswego.cs.dl.util.concurrent.TimeoutException;
   
   /**
    * 
  @@ -32,23 +41,65 @@
    */
   public class RemoteNodeJoiner
       extends AbstractRemoteNode
  -    implements RemoteNode
   {
   
  -    private final MessagingTransportFactory connFactory;
  +    private final SelectorManager sm;
       
  -    public RemoteNodeJoiner(NodeInfo aNodeInfo, IOContext anIOContext,
  -        MessagingTransportFactory aFactory) {
  -        super(aNodeInfo, anIOContext);
  -        if ( null == aFactory ) {
  -            throw new IllegalArgumentException("Factory is required.");
  +    public RemoteNodeJoiner(NodeInfo aLocalNodeInfo, NodeInfo aRemoteNodeInfo, 
  +        IOContext anIOContext, SelectorManager aSelectorManager) {
  +        super(aLocalNodeInfo, aRemoteNodeInfo, anIOContext);
  +        if ( null == aSelectorManager ) {
  +            throw new IllegalArgumentException("SelectorManager is required");
           }
  -        connFactory = aFactory;
  +        sm = aSelectorManager;
       }
   
  -    public RemoteNodeConnection newConnection()
  -        throws IOException, CommunicationException {
  -        return connFactory.factoryRemoteNodeConnection(nodeInfo, ioContext);
  +    public void join() throws NodeException {
  +        RemoteNodeConnection connection = 
  +            new RemoteNodeJoinerConnection(remoteNodeInfo, ioContext, sm);
  +        setConnection(connection);
  +
  +        Msg msg = new Msg();
  +        MsgHeader header = msg.getHeader();
  +        header.addHeader(MsgHeaderConstants.SRC_NODE, localNodeInfo);
  +        header.addHeader(MsgHeaderConstants.DEST_NODE, remoteNodeInfo);
  +        
  +        // Only set to comply with a valid request. 
  +        header.addHeader(MsgHeaderConstants.DEST_NODES, remoteNodeInfo);
  +        header.addHeader(MsgHeaderConstants.SRC_ENDPOINT, "");
  +        header.addHeader(MsgHeaderConstants.CORRELATION_ID,
  +            new RequestSender.RequestID((byte) 0));
  +        header.addHeader(MsgHeaderConstants.BODY_TYPE, MsgBody.Type.REQUEST);
  +        header.addHeader(MsgHeaderConstants.TOPOLOGY_VERSION, new Integer(0));
  +        
  +        msg.getBody().setContent(localNodeInfo);
  +
  +        final FutureResult result = new FutureResult();
  +        setMsgProducerOut(new MsgOutInterceptor() {
  +            public void push(Msg aMsg) {
  +                result.set(aMsg);
  +            }
  +        });
  +        getMsgConsumerOut().push(msg);
  +        Msg reply;
  +        try {
  +            // waits 3 seconds for a reply.
  +            reply = (Msg) result.get();
  +            reply = (Msg) result.timedGet(3000);
  +        } catch (TimeoutException e) {
  +            throw new NodeException("Join request submitted by " +
  +                localNodeInfo + " to " + remoteNodeInfo + " has timed out.");
  +        } catch (InterruptedException e) {
  +            throw new NodeException(e);
  +        } catch (InvocationTargetException e) {
  +            throw new NodeException(e);
  +        }
  +        Boolean isOK = (Boolean) reply.getBody().getContent();
  +        if ( Boolean.FALSE == isOK ) {
  +            throw new NodeException(remoteNodeInfo + " has refused the " +
  +                "join request submitted by " + localNodeInfo);
  +        }
  +        manager.registerRemoteNode(this);
       }
       
   }
  
  
  
  1.6       +19 -54    incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/NodeServerImpl.java
  
  Index: NodeServerImpl.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/NodeServerImpl.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- NodeServerImpl.java	17 Jul 2004 03:45:41 -0000	1.5
  +++ NodeServerImpl.java	20 Jul 2004 00:15:05 -0000	1.6
  @@ -17,28 +17,20 @@
   
   package org.apache.geronimo.messaging.remotenode.network;
   
  -import java.io.IOException;
   import java.net.URI;
   
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
  -import org.apache.geronimo.messaging.CommunicationException;
  -import org.apache.geronimo.messaging.Msg;
  -import org.apache.geronimo.messaging.MsgBody;
  +import org.apache.geronimo.messaging.NodeException;
   import org.apache.geronimo.messaging.NodeInfo;
  -import org.apache.geronimo.messaging.interceptors.MsgOutInterceptor;
   import org.apache.geronimo.messaging.io.IOContext;
   import org.apache.geronimo.messaging.remotenode.NodeServer;
  -import org.apache.geronimo.messaging.remotenode.RemoteNode;
  -import org.apache.geronimo.messaging.remotenode.RemoteNodeConnection;
   import org.apache.geronimo.messaging.remotenode.RemoteNodeManager;
  -import org.apache.geronimo.messaging.remotenode.admin.JoinReply;
   import org.apache.geronimo.network.SelectorManager;
   import org.apache.geronimo.network.protocol.AcceptableProtocol;
   import org.apache.geronimo.network.protocol.ProtocolException;
   import org.apache.geronimo.network.protocol.ProtocolFactory;
   import org.apache.geronimo.network.protocol.ServerSocketAcceptor;
  -import org.apache.geronimo.network.protocol.SocketProtocol;
   import org.apache.geronimo.network.protocol.ProtocolFactory.AcceptedCallBack;
   import org.apache.geronimo.pool.ClockPool;
   
  @@ -86,11 +78,14 @@
           serverSocketAcceptor.setSelectorManager(selectorManager);
       }
   
  -    public void start() throws IOException, CommunicationException {
  +    public void start() throws NodeException {
  +        if ( null == manager ) {
  +            throw new IllegalStateException("Manager is not set.");
  +        }
           log.debug("Starting NodeServer.");
  -        SocketProtocol spt = new SocketProtocol();
  +        CallbackSocketProtocol spt = new CallbackSocketProtocol();
           // TODO configurable.
  -        spt.setTimeout(10 * 1000);
  +        spt.setTimeout(1000);
           spt.setSelectorManager(selectorManager);
   
           ProtocolFactory pf = new ProtocolFactory();
  @@ -98,7 +93,7 @@
           // TODO configurable.
           pf.setMaxAge(Long.MAX_VALUE);
           pf.setMaxInactivity(1 * 60 * 60 * 1000);
  -        pf.setReclaimPeriod(10 * 1000);
  +        pf.setReclaimPeriod(500);
           pf.setTemplate(spt);
           pf.setAcceptedCallBack(this);
   
  @@ -114,20 +109,16 @@
               serverSocketAcceptor.setUri(bindURI);
               serverSocketAcceptor.startup();
           } catch (Exception e) {
  -            IOException exception = new IOException("Can not start.");
  -            exception.initCause(e);
  -            throw exception;
  +            throw new NodeException("Can not start server", e);
           }
       }
   
  -    public void stop() throws IOException, CommunicationException {
  +    public void stop() {
           log.info("Stopping NodeServer.");
           try {
               serverSocketAcceptor.drain();
           } catch (Exception e) {
  -            IOException exception = new IOException("Can not stop.");
  -            exception.initCause(e);
  -            throw exception;
  +            log.error("Error stopping NodeServer", e);
           }
       }
   
  @@ -137,40 +128,14 @@
       
       public void accepted(AcceptableProtocol aProtocol)
           throws ProtocolException {
  -        new RemoteNodeInitializer(aProtocol);
  -    }
  -    
  -    private class RemoteNodeInitializer implements MsgOutInterceptor {
  -        private final RemoteNodeConnection connection;
  -        private RemoteNodeInitializer(AcceptableProtocol aProtocol)
  -            throws ProtocolException {
  -            connection =
  -                new RemoteNodeJoinedConnection(ioContext, aProtocol);
  -            try {
  -                connection.open();
  -            } catch (IOException e) {
  -                throw new ProtocolException(e);
  -            } catch (CommunicationException e) {
  -                throw new ProtocolException(e);
  -            }
  -            connection.setMsgProducerOut(this);
  -        }
  -        
  -        public void push(Msg aMsg) {
  -            MsgBody body = aMsg.getBody();
  -            NodeInfo otherNodeInfo = (NodeInfo) body.getContent();
  -            
  -            JoinReply joinReply = new JoinReply(aMsg);
  -            joinReply.execute(connection);
  -            
  -            RemoteNode remoteNode = manager.findRemoteNode(otherNodeInfo);
  -            if ( null == remoteNode ) {
  -                remoteNode = new RemoteNodeJoined(otherNodeInfo, ioContext);
  -            }
  -            remoteNode.addConnection(connection);
  -            manager.registerRemoteNode(remoteNode);
  +        RemoteNodeJoined remoteNode =
  +            new RemoteNodeJoined(nodeInfo, ioContext, aProtocol);
  +        remoteNode.setManager(manager);
  +        try {
  +            remoteNode.join();
  +        } catch (NodeException e) {
  +            log.error("Can not join node", e);
           }
  -
       }
       
   }
  
  
  
  1.3       +47 -8     incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/RemoteNodeJoined.java
  
  Index: RemoteNodeJoined.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/RemoteNodeJoined.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- RemoteNodeJoined.java	3 Jun 2004 14:39:44 -0000	1.2
  +++ RemoteNodeJoined.java	20 Jul 2004 00:15:05 -0000	1.3
  @@ -17,10 +17,17 @@
   
   package org.apache.geronimo.messaging.remotenode.network;
   
  +import org.apache.commons.logging.Log;
  +import org.apache.commons.logging.LogFactory;
  +import org.apache.geronimo.messaging.Msg;
  +import org.apache.geronimo.messaging.MsgBody;
  +import org.apache.geronimo.messaging.NodeException;
   import org.apache.geronimo.messaging.NodeInfo;
  +import org.apache.geronimo.messaging.interceptors.MsgOutInterceptor;
   import org.apache.geronimo.messaging.io.IOContext;
  -import org.apache.geronimo.messaging.remotenode.RemoteNode;
  +import org.apache.geronimo.messaging.remotenode.AbstractRemoteNode;
   import org.apache.geronimo.messaging.remotenode.RemoteNodeConnection;
  +import org.apache.geronimo.network.protocol.Protocol;
   
   /**
    * 
  @@ -28,16 +35,48 @@
    */
   public class RemoteNodeJoined
       extends AbstractRemoteNode
  -    implements RemoteNode
   {
  +    
  +    private static final Log log = LogFactory.getLog(RemoteNodeJoined.class);
   
  -    public RemoteNodeJoined(NodeInfo aNodeInfo, IOContext anIOContext) {
  -        super(aNodeInfo, anIOContext);
  +    private final Protocol protocol;
  +    
  +    
  +    public RemoteNodeJoined(NodeInfo aLocalNode, IOContext anIOContext, 
  +        Protocol aProtocol) {
  +        super(aLocalNode, anIOContext);
  +        protocol = aProtocol;
       }
       
  -    public RemoteNodeConnection newConnection() {
  -        throw new UnsupportedOperationException(
  -            "A joined node does not create connections");
  +    public void join() throws NodeException {
  +        RemoteNodeConnection connection =
  +            new RemoteNodeJoinedConnection(ioContext, protocol);
  +        setConnection(connection);
  +        
  +        setMsgProducerOut(new JoinExecutor());
  +    }
  +    
  +    private class JoinExecutor implements MsgOutInterceptor {
  +
  +        public void push(Msg aMsg) {
  +            MsgBody body = aMsg.getBody();
  +            remoteNodeInfo = (NodeInfo) body.getContent();
  +            
  +            if ( null != manager.findRemoteNode(remoteNodeInfo) ) {
  +                log.error(remoteNodeInfo + 
  +                    " tried to join twice this node; rejecting request.");
  +                Msg msg = aMsg.reply();
  +                msg.getBody().setContent(Boolean.FALSE);
  +                getMsgConsumerOut().push(msg);
  +                return;
  +            }
  +            Msg msg = aMsg.reply();
  +            msg.getBody().setContent(Boolean.TRUE);
  +            getMsgConsumerOut().push(msg);
  +
  +            manager.registerRemoteNode(RemoteNodeJoined.this);
  +        }
  +        
       }
   
   }
  
  
  
  1.2       +53 -17    incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/AbstractRemoteNodeConnection.java
  
  Index: AbstractRemoteNodeConnection.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/AbstractRemoteNodeConnection.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- AbstractRemoteNodeConnection.java	11 May 2004 12:06:42 -0000	1.1
  +++ AbstractRemoteNodeConnection.java	20 Jul 2004 00:15:05 -0000	1.2
  @@ -19,7 +19,9 @@
   
   import java.io.IOException;
   
  -import org.apache.geronimo.messaging.CommunicationException;
  +import org.apache.commons.logging.Log;
  +import org.apache.commons.logging.LogFactory;
  +import org.apache.geronimo.messaging.NodeException;
   import org.apache.geronimo.messaging.interceptors.MsgOutInterceptor;
   import org.apache.geronimo.messaging.io.IOContext;
   import org.apache.geronimo.messaging.io.PopSynchronization;
  @@ -27,6 +29,7 @@
   import org.apache.geronimo.messaging.io.ReplacerResolver;
   import org.apache.geronimo.messaging.io.StreamManager;
   import org.apache.geronimo.messaging.remotenode.RemoteNodeConnection;
  +import org.apache.geronimo.messaging.remotenode.network.CallbackSocketProtocol.SocketProtocolListener;
   import org.apache.geronimo.network.protocol.Protocol;
   import org.apache.geronimo.network.protocol.ProtocolException;
   
  @@ -39,11 +42,14 @@
       implements RemoteNodeConnection
   {
   
  +    private static final Log log = LogFactory.getLog(AbstractRemoteNodeConnection.class);
  +    
       private final IOContext ioContext;
       protected Protocol protocol;
       private MsgOutInterceptor msgOut;
       private ProtocolInDispatcher inDispatcher;
  -
  +    private LifecycleListener listener;
  +    
       public AbstractRemoteNodeConnection(IOContext anIOContext) {
           if ( null == anIOContext ) {
               throw new IllegalArgumentException("IOContext is required.");
  @@ -67,36 +73,66 @@
           return msgOut;
       }
       
  -    public void open() throws IOException, CommunicationException {
  +    public void open() throws NodeException {
           try {
               protocol = newProtocol();
           } catch (ProtocolException e) {
  -            IOException exception = new IOException("Can not create Protocol.");
  -            exception.initCause(e);
  -            throw exception;
  +            throw new NodeException("Can not create protocol", e);
  +        }
  +        Protocol curProtocol = protocol;
  +        while ( null != curProtocol ) {
  +            if ( curProtocol instanceof CallbackSocketProtocol ) {
  +                ((CallbackSocketProtocol) curProtocol).setListener(
  +                    new SocketProtocolListener() {
  +                        public void onClose() {
  +                            msgOut = null;
  +                            inDispatcher = null;
  +                            if ( null != listener ) {
  +                                listener.onClose();
  +                            }
  +                        }
  +                    });
  +                break;
  +            }
  +            curProtocol = curProtocol.getDownProtocol();
  +        }
  +        if ( false == curProtocol instanceof CallbackSocketProtocol ) {
  +            throw new AssertionError("No CallbackSocketProtocol.");
           }
  +        
           StreamManager streamManager = ioContext.getStreamManager();
           ReplacerResolver replacerResolver = ioContext.getReplacerResolver();
  -        PushSynchronization pushSynchronization = ioContext.getPushSynchronization();
  -        PopSynchronization popSynchronization = ioContext.getPopSynchronization();
  -        msgOut = new ProtocolOutInterceptor(protocol,
  -            streamManager, pushSynchronization, replacerResolver);
  -        inDispatcher = new ProtocolInDispatcher(protocol,
  -            streamManager, popSynchronization, replacerResolver);
  +        PushSynchronization pushSynchronization = 
  +            ioContext.getPushSynchronization();
  +        PopSynchronization popSynchronization = 
  +            ioContext.getPopSynchronization();
  +        try {
  +            msgOut = new ProtocolOutInterceptor(protocol,
  +                streamManager, pushSynchronization, replacerResolver);
  +            inDispatcher = new ProtocolInDispatcher(protocol,
  +                streamManager, popSynchronization, replacerResolver);
  +        } catch (IOException e) {
  +            throw new NodeException("Can not set-up IO context.", e);
  +        }
       }
   
       protected abstract Protocol newProtocol() throws ProtocolException;
       
  -    public void close() throws IOException, CommunicationException {
  +    public void close() {
           msgOut = null;
           inDispatcher = null;
           try {
               protocol.drain();
           } catch (ProtocolException e) {
  -            IOException exception = new IOException("Can not drain Protocol");
  -            exception.initCause(e);
  -            throw exception;
  +            log.error("Error when closing connection", e);
  +        }
  +        if ( null != listener ) {
  +            listener.onClose();
           }
       }
   
  +    public void setLifecycleListener(LifecycleListener aListener) {
  +        listener = aListener;
  +    }
  +    
   }
  
  
  
  1.5       +6 -21     incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/NetworkTransportFactory.java
  
  Index: NetworkTransportFactory.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/NetworkTransportFactory.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- NetworkTransportFactory.java	8 Jul 2004 05:13:29 -0000	1.4
  +++ NetworkTransportFactory.java	20 Jul 2004 00:15:05 -0000	1.5
  @@ -19,14 +19,11 @@
   
   import org.apache.geronimo.gbean.GBeanInfo;
   import org.apache.geronimo.gbean.GBeanInfoFactory;
  -import org.apache.geronimo.gbean.GBeanLifecycle;
  -import org.apache.geronimo.gbean.WaitingException;
   import org.apache.geronimo.messaging.NodeInfo;
   import org.apache.geronimo.messaging.io.IOContext;
   import org.apache.geronimo.messaging.remotenode.MessagingTransportFactory;
   import org.apache.geronimo.messaging.remotenode.NodeServer;
   import org.apache.geronimo.messaging.remotenode.RemoteNode;
  -import org.apache.geronimo.messaging.remotenode.RemoteNodeConnection;
   import org.apache.geronimo.network.SelectorManager;
   import org.apache.geronimo.pool.ClockPool;
   
  @@ -36,7 +33,7 @@
    * @version $Revision$ $Date$
    */
   public class NetworkTransportFactory
  -    implements GBeanLifecycle, MessagingTransportFactory
  +    implements MessagingTransportFactory
   {
       
       private final SelectorManager sm;
  @@ -51,28 +48,16 @@
           cp = aClockPool;
       }
       
  -    public void doStart() throws WaitingException, Exception {
  -    }
  -
  -    public void doStop() throws WaitingException, Exception {
  -    }
  -
  -    public void doFail() {
  -    }
  -    
       public NodeServer factoryServer(NodeInfo aNodeInfo, IOContext anIOContext) {
           return new NodeServerImpl(aNodeInfo, anIOContext, sm, cp);
       }
       
  -    public RemoteNode factoryRemoteNode(NodeInfo aNodeInfo, IOContext anIOContext) {
  -        return new RemoteNodeJoiner(aNodeInfo, anIOContext, this);
  +    public RemoteNode factoryRemoteNode(NodeInfo aLocalNodeInfo,
  +        NodeInfo aRemoteNodeInfo, IOContext anIOContext) {
  +        return new RemoteNodeJoiner(aLocalNodeInfo, aRemoteNodeInfo, 
  +            anIOContext, sm);
       }
   
  -    public RemoteNodeConnection factoryRemoteNodeConnection(
  -        NodeInfo aNodeInfo, IOContext anIOContext) {
  -        return new RemoteNodeJoinerConnection(aNodeInfo, anIOContext, sm);
  -    }
  -    
       public static final GBeanInfo GBEAN_INFO;
   
       static {
  
  
  
  1.4       +3 -4      incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/RemoteNodeJoinerConnection.java
  
  Index: RemoteNodeJoinerConnection.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/RemoteNodeJoinerConnection.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- RemoteNodeJoinerConnection.java	17 Jul 2004 03:45:41 -0000	1.3
  +++ RemoteNodeJoinerConnection.java	20 Jul 2004 00:15:05 -0000	1.4
  @@ -25,7 +25,6 @@
   import org.apache.geronimo.network.SelectorManager;
   import org.apache.geronimo.network.protocol.Protocol;
   import org.apache.geronimo.network.protocol.ProtocolException;
  -import org.apache.geronimo.network.protocol.SocketProtocol;
   
   /**
    * 
  @@ -60,9 +59,9 @@
           String hostName = nodeInfo.getAddress().getHostName();
           int port = nodeInfo.getPort();
   
  -        SocketProtocol socketProtocol = new SocketProtocol();
  +        CallbackSocketProtocol socketProtocol = new CallbackSocketProtocol();
           // TODO configurable.
  -        socketProtocol.setTimeout(10 * 1000);
  +        socketProtocol.setTimeout(1000);
           socketProtocol.setInterface(new InetSocketAddress(hostName, 0));
           socketProtocol.setAddress(new InetSocketAddress(hostName, port));
           socketProtocol.setSelectorManager(selectorManager);
  
  
  
  1.2       +1 -3      incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/RemoteNodeJoinedConnection.java
  
  Index: RemoteNodeJoinedConnection.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/RemoteNodeJoinedConnection.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- RemoteNodeJoinedConnection.java	11 May 2004 12:06:42 -0000	1.1
  +++ RemoteNodeJoinedConnection.java	20 Jul 2004 00:15:05 -0000	1.2
  @@ -18,7 +18,6 @@
   package org.apache.geronimo.messaging.remotenode.network;
   
   import org.apache.geronimo.messaging.io.IOContext;
  -import org.apache.geronimo.messaging.remotenode.RemoteNodeConnection;
   import org.apache.geronimo.network.protocol.Protocol;
   import org.apache.geronimo.network.protocol.ProtocolException;
   
  @@ -28,7 +27,6 @@
    */
   public class RemoteNodeJoinedConnection
       extends AbstractRemoteNodeConnection
  -    implements RemoteNodeConnection
   {
   
       public RemoteNodeJoinedConnection(IOContext anIOContext,
  
  
  
  1.1                  incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/CallbackSocketProtocol.java
  
  Index: CallbackSocketProtocol.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.messaging.remotenode.network;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.geronimo.network.protocol.Protocol;
  import org.apache.geronimo.network.protocol.SocketProtocol;
  
  /**
   * SocketProtocol providing asynchronous callbacks upon closure.
   *
   * @version $Revision: 1.1 $ $Date: 2004/07/20 00:15:05 $
   */
  public class CallbackSocketProtocol
      extends SocketProtocol
  {
      
      private Log log = LogFactory.getLog(CallbackSocketProtocol.class);
  
      private static int nextConnectionId = 0;
  
      /**
       * To be notified when the socket is closed.
       */
      private SocketProtocolListener listener;
  
      private synchronized static int getNextConnectionId() {
          return nextConnectionId++;
      }
  
      public void close() {
          super.close();
          if ( null != listener ) {
              listener.onClose();
          }
      }
  
      /**
       * Gets the listener to be notified upon closure of the underlying socket.
       * 
       * @return Listener.
       */
      public SocketProtocolListener getListener() {
          return listener;
      }
      
      /**
       * Sets the listener.
       * 
       * @param aListener Listener.
       */
      public void setListener(SocketProtocolListener aListener) {
          listener = aListener;
      }
  
      public Protocol cloneProtocol() throws CloneNotSupportedException {
          CallbackSocketProtocol p = (CallbackSocketProtocol) super.clone();
          p.log = LogFactory.getLog(CallbackSocketProtocol.class.getName() + ":" + getNextConnectionId());
          return p;
      }
  
  
      /**
       * When the underlying socket is closed, this callback is called.
       *
       * @version $Revision: 1.1 $ $Date: 2004/07/20 00:15:05 $
       */
      public interface SocketProtocolListener {
          
          public void onClose();
          
      }
      
  }
  
  
  
  1.2       +24 -4     incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/remotenode/MockNodeServer.java
  
  Index: MockNodeServer.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/remotenode/MockNodeServer.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- MockNodeServer.java	11 May 2004 12:06:43 -0000	1.1
  +++ MockNodeServer.java	20 Jul 2004 00:15:06 -0000	1.2
  @@ -1,15 +1,35 @@
  +/**
  + *
  + * Copyright 2004 The Apache Software Foundation
  + *
  + *  Licensed under the Apache License, Version 2.0 (the "License");
  + *  you may not use this file except in compliance with the License.
  + *  You may obtain a copy of the License at
  + *
  + *     http://www.apache.org/licenses/LICENSE-2.0
  + *
  + *  Unless required by applicable law or agreed to in writing, software
  + *  distributed under the License is distributed on an "AS IS" BASIS,
  + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  + *  See the License for the specific language governing permissions and
  + *  limitations under the License.
  + */
  +
   package org.apache.geronimo.messaging.remotenode;
   
  -import java.io.IOException;
  +import org.apache.geronimo.messaging.NodeException;
   
  -import org.apache.geronimo.messaging.CommunicationException;
   
  +/**
  + *
  + * @version $Revision$ $Date$
  + */
   public class MockNodeServer implements NodeServer {
   
  -    public void start() throws IOException, CommunicationException {
  +    public void start() throws NodeException {
       }
   
  -    public void stop() throws IOException, CommunicationException {
  +    public void stop() {
       }
   
       public void setRemoteNodeManager(RemoteNodeManager aManager) {
  
  
  
  1.5       +125 -27   incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/remotenode/RemoteNodeManagerImplTest.java
  
  Index: RemoteNodeManagerImplTest.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/remotenode/RemoteNodeManagerImplTest.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- RemoteNodeManagerImplTest.java	8 Jul 2004 05:13:29 -0000	1.4
  +++ RemoteNodeManagerImplTest.java	20 Jul 2004 00:15:06 -0000	1.5
  @@ -54,6 +54,29 @@
           cp.setPoolName("CP");
   
           manager = new RemoteNodeManagerImpl(nodeInfo1, ioContext, cp, factory);
  +        
  +        NodeTopology topology = new NodeTopology() {
  +            public int getVersion() {
  +                return 0;
  +            }
  +            public Set getNeighbours(NodeInfo aRoot) {
  +                return new HashSet();
  +            }
  +            public NodeInfo[] getPath(NodeInfo aSource, NodeInfo aTarget) {
  +                return null;
  +            }
  +            public int getIDOfNode(NodeInfo aNodeInfo) {
  +                throw new UnsupportedOperationException("getVersion");
  +            }
  +            public NodeInfo getNodeById(int anId) {
  +                throw new UnsupportedOperationException("getVersion");
  +            }
  +            public Set getNodes() {
  +                throw new UnsupportedOperationException("getVersion");
  +            }
  +        };
  +        manager.prepareTopology(topology);
  +        manager.commitTopology();
       }
       
       public void testRegisterRemoteNode() throws Exception {
  @@ -89,82 +112,148 @@
           assertEquals(remoteNode, listener.event.getRemoteNode());
       }
       
  -    public void testGetMsgOut() throws Exception {
  +    private TestGetMsgOutInfo newGetMsgOutInfo() throws Exception {
  +        final TestGetMsgOutInfo info = new TestGetMsgOutInfo();
  +
           InetAddress address = InetAddress.getLocalHost();
  -        final NodeInfo srcNode = new NodeInfo("SrcNode1", address, 8081);
  -        final NodeInfo node1 = new NodeInfo("Node1", address, 8081);
  -        final NodeInfo node2 = new NodeInfo("Node2", address, 8081);
  +        info.srcNode = new NodeInfo("SrcNode1", address, 8081);
  +        info.node1 = new NodeInfo("Node1", address, 8081);
  +        info.node2 = new NodeInfo("Node2", address, 8081);
           
           MockRemoteNode remoteNode1 = new MockRemoteNode();
  -        remoteNode1.setNodeInfo(node1);
  +        remoteNode1.setNodeInfo(info.node1);
           manager.registerRemoteNode(remoteNode1);
           
           MockRemoteNode remoteNode2 = new MockRemoteNode();
  -        remoteNode2.setNodeInfo(node2);
  +        remoteNode2.setNodeInfo(info.node2);
           manager.registerRemoteNode(remoteNode2);
   
  -        NodeTopology topology = new NodeTopology() {
  +        info.topology = new NodeTopology() {
               public Set getNeighbours(NodeInfo aRoot) {
                   Set result = new HashSet();
  -                result.add(node1);
  -                result.add(node2);
  +                result.add(info.node1);
  +                result.add(info.node2);
                   return result;
               }
               public NodeInfo[] getPath(NodeInfo aSource, NodeInfo aTarget) {
  -                if ( aSource.equals(srcNode) && aTarget.equals(node1) ) {
  -                    return new NodeInfo[] {node1};
  -                } else if ( aSource.equals(srcNode) && aTarget.equals(node2) ) {
  -                    return new NodeInfo[] {node2};
  +                if ( aSource.equals(info.srcNode) && 
  +                    aTarget.equals(info.node1) ) {
  +                    return new NodeInfo[] {info.node1};
  +                } else if ( aSource.equals(info.srcNode) && 
  +                    aTarget.equals(info.node2) ) {
  +                    return new NodeInfo[] {info.node2};
                   }
                   return null;
               }
               public int getIDOfNode(NodeInfo aNodeInfo) {
  -                return 0;
  +                throw new UnsupportedOperationException("getIDOfNode");
               }
               public NodeInfo getNodeById(int anId) {
  -                return null;
  +                throw new UnsupportedOperationException("getNodeById");
               }
               public Set getNodes() {
  -                return null;
  +                throw new UnsupportedOperationException("getNodes");
               }
               public int getVersion() {
  -                return 0;
  +                return 1;
               }
           };
  -        manager.setTopology(topology);
  +        // Test that Msg are successfully routed within the context of
  +        // a prepared topology.
  +        manager.prepareTopology(info.topology);
  +        
  +        info.remoteNode1 = remoteNode1;
  +        info.remoteNode2 = remoteNode2;
  +        return info;
  +    }
  +
  +    /**
  +     * Test that Msg are successfully routed within the context of a prepared 
  +     * topology.
  +     */
  +    public void testPreparedGetMsgOut() throws Exception {
  +        TestGetMsgOutInfo info = newGetMsgOutInfo();
  +
  +        MsgOutInterceptor out = manager.getMsgConsumerOut();
  +        Msg msg = new Msg();
  +        MsgHeader header = msg.getHeader();
  +        Integer id = new Integer(1234);
  +        header.addHeader(MsgHeaderConstants.CORRELATION_ID, id);
  +        header.addHeader(MsgHeaderConstants.SRC_NODE, info.srcNode);
  +        header.addHeader(MsgHeaderConstants.DEST_NODES, info.node1);
  +        header.addHeader(MsgHeaderConstants.TOPOLOGY_VERSION,
  +            new Integer(info.topology.getVersion()));
  +        out.push(msg);
  +        
  +        List receivedMsgs = info.remoteNode1.getPushedMsg();
  +        assertEquals(1, receivedMsgs.size());
  +        msg = (Msg) receivedMsgs.get(0);
  +        assertEquals(id, msg.getHeader().getHeader(MsgHeaderConstants.CORRELATION_ID));
  +        receivedMsgs.clear();
  +        
  +        receivedMsgs = info.remoteNode2.getPushedMsg();
  +        assertEquals(0, receivedMsgs.size());
  +        
  +        msg = new Msg();
  +        header = msg.getHeader();
  +        header.addHeader(MsgHeaderConstants.CORRELATION_ID, id);
  +        header.addHeader(MsgHeaderConstants.SRC_NODE, info.srcNode);
  +        header.addHeader(MsgHeaderConstants.DEST_NODES, info.node2);
  +        header.addHeader(MsgHeaderConstants.TOPOLOGY_VERSION,
  +            new Integer(info.topology.getVersion()));
  +        out.push(msg);
  +        
  +        receivedMsgs = info.remoteNode2.getPushedMsg();
  +        assertEquals(1, receivedMsgs.size());
  +        msg = (Msg) receivedMsgs.get(0);
  +        assertEquals(id, msg.getHeader().getHeader(MsgHeaderConstants.CORRELATION_ID));
  +        receivedMsgs.clear();
  +        
  +        receivedMsgs = info.remoteNode1.getPushedMsg();
  +        assertEquals(0, receivedMsgs.size());
  +    }
  +    
  +    /**
  +     * Test that Msg are successfully routed within the context of a committed 
  +     * topology.
  +     */
  +    public void testCommittedGetMsgOut() throws Exception {
  +        TestGetMsgOutInfo info = newGetMsgOutInfo();
  +        
  +        manager.commitTopology();
   
           MsgOutInterceptor out = manager.getMsgConsumerOut();
           Msg msg = new Msg();
           MsgHeader header = msg.getHeader();
           Integer id = new Integer(1234);
           header.addHeader(MsgHeaderConstants.CORRELATION_ID, id);
  -        header.addHeader(MsgHeaderConstants.SRC_NODE, srcNode);
  -        header.addHeader(MsgHeaderConstants.DEST_NODES, node1);
  +        header.addHeader(MsgHeaderConstants.SRC_NODE, info.srcNode);
  +        header.addHeader(MsgHeaderConstants.DEST_NODES, info.node1);
           out.push(msg);
           
  -        List receivedMsgs = remoteNode1.getPushedMsg();
  +        List receivedMsgs = info.remoteNode1.getPushedMsg();
           assertEquals(1, receivedMsgs.size());
           msg = (Msg) receivedMsgs.get(0);
           assertEquals(id, msg.getHeader().getHeader(MsgHeaderConstants.CORRELATION_ID));
           receivedMsgs.clear();
           
  -        receivedMsgs = remoteNode2.getPushedMsg();
  +        receivedMsgs = info.remoteNode2.getPushedMsg();
           assertEquals(0, receivedMsgs.size());
           
           msg = new Msg();
           header = msg.getHeader();
           header.addHeader(MsgHeaderConstants.CORRELATION_ID, id);
  -        header.addHeader(MsgHeaderConstants.SRC_NODE, srcNode);
  -        header.addHeader(MsgHeaderConstants.DEST_NODES, node2);
  +        header.addHeader(MsgHeaderConstants.SRC_NODE, info.srcNode);
  +        header.addHeader(MsgHeaderConstants.DEST_NODES, info.node2);
           out.push(msg);
           
  -        receivedMsgs = remoteNode2.getPushedMsg();
  +        receivedMsgs = info.remoteNode2.getPushedMsg();
           assertEquals(1, receivedMsgs.size());
           msg = (Msg) receivedMsgs.get(0);
           assertEquals(id, msg.getHeader().getHeader(MsgHeaderConstants.CORRELATION_ID));
           receivedMsgs.clear();
           
  -        receivedMsgs = remoteNode1.getPushedMsg();
  +        receivedMsgs = info.remoteNode1.getPushedMsg();
           assertEquals(0, receivedMsgs.size());
       }
       
  @@ -173,6 +262,15 @@
           public void fireRemoteNodeEvent(RemoteNodeEvent anEvent) {
               event = anEvent;
           }
  +    }
  +    
  +    private class TestGetMsgOutInfo {
  +        private NodeTopology topology;
  +        private MockRemoteNode remoteNode1;
  +        private MockRemoteNode remoteNode2;
  +        private NodeInfo srcNode;
  +        private NodeInfo node1;
  +        private NodeInfo node2;
       }
       
   }
  
  
  
  1.3       +23 -7     incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/remotenode/MockMessagingTransportFactory.java
  
  Index: MockMessagingTransportFactory.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/remotenode/MockMessagingTransportFactory.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- MockMessagingTransportFactory.java	24 Jun 2004 23:39:03 -0000	1.2
  +++ MockMessagingTransportFactory.java	20 Jul 2004 00:15:06 -0000	1.3
  @@ -1,8 +1,29 @@
  +/**
  + *
  + * Copyright 2004 The Apache Software Foundation
  + *
  + *  Licensed under the Apache License, Version 2.0 (the "License");
  + *  you may not use this file except in compliance with the License.
  + *  You may obtain a copy of the License at
  + *
  + *     http://www.apache.org/licenses/LICENSE-2.0
  + *
  + *  Unless required by applicable law or agreed to in writing, software
  + *  distributed under the License is distributed on an "AS IS" BASIS,
  + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  + *  See the License for the specific language governing permissions and
  + *  limitations under the License.
  + */
  +
   package org.apache.geronimo.messaging.remotenode;
   
   import org.apache.geronimo.messaging.NodeInfo;
   import org.apache.geronimo.messaging.io.IOContext;
   
  +/**
  + *
  + * @version $Revision$ $Date$
  + */
   public class MockMessagingTransportFactory
       implements MessagingTransportFactory
   {
  @@ -19,13 +40,8 @@
           return server;
       }
   
  -    public RemoteNode factoryRemoteNode(NodeInfo aNodeInfo, IOContext anIOContext) {
  -        return null;
  -    }
  -
  -    public RemoteNodeConnection factoryRemoteNodeConnection(
  -        NodeInfo aNodeInfo,
  -        IOContext anIOContext) {
  +    public RemoteNode factoryRemoteNode(NodeInfo aLocalNodeInfo,
  +        NodeInfo aRemoteNodeInfo, IOContext anIOContext) {
           return null;
       }
   
  
  
  
  1.4       +7 -12     incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/remotenode/MockRemoteNode.java
  
  Index: MockRemoteNode.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/remotenode/MockRemoteNode.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- MockRemoteNode.java	10 Jun 2004 23:12:25 -0000	1.3
  +++ MockRemoteNode.java	20 Jul 2004 00:15:06 -0000	1.4
  @@ -17,12 +17,11 @@
   
   package org.apache.geronimo.messaging.remotenode;
   
  -import java.io.IOException;
   import java.util.ArrayList;
   import java.util.List;
   
  -import org.apache.geronimo.messaging.CommunicationException;
   import org.apache.geronimo.messaging.Msg;
  +import org.apache.geronimo.messaging.NodeException;
   import org.apache.geronimo.messaging.NodeInfo;
   import org.apache.geronimo.messaging.interceptors.MsgOutInterceptor;
   
  @@ -49,17 +48,13 @@
           return nodeInfo;
       }
   
  -    public RemoteNodeConnection newConnection() throws IOException, CommunicationException {
  -        return null;
  +    public void setManager(RemoteNodeManager aManager) {
       }
  -
  -    public void leave() throws IOException, CommunicationException {
  -    }
  -
  -    public void addConnection(RemoteNodeConnection aConnection) {
  +    
  +    public void leave() {
       }
  -
  -    public void removeConnection(RemoteNodeConnection aConnection) {
  +    
  +    public void join() throws NodeException {
       }
   
       public void setMsgProducerOut(MsgOutInterceptor aMsgOut) {
  
  
  
  1.3       +11 -25    incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNode.java
  
  Index: RemoteNode.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNode.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- RemoteNode.java	3 Jun 2004 14:39:44 -0000	1.2
  +++ RemoteNode.java	20 Jul 2004 00:15:06 -0000	1.3
  @@ -17,10 +17,9 @@
   
   package org.apache.geronimo.messaging.remotenode;
   
  -import java.io.IOException;
   
  -import org.apache.geronimo.messaging.CommunicationException;
   import org.apache.geronimo.messaging.MsgConsProd;
  +import org.apache.geronimo.messaging.NodeException;
   import org.apache.geronimo.messaging.NodeInfo;
   
   /**
  @@ -40,36 +39,23 @@
       public NodeInfo getNodeInfo();
   
       /**
  -     * Returns a connection to the remote node.
  -     * <BR>
  -     * This connection is not opened.
  +     * Sets the manager of this remote node.
        * 
  -     * @throws IOException Indicates an I/O problem.
  -     * @throws CommunicationException If a communication can not be established.
  +     * @param aManager Manager.
        */
  -    public RemoteNodeConnection newConnection()
  -        throws IOException, CommunicationException;
  -
  +    public void setManager(RemoteNodeManager aManager);
  +    
       /**
        * Leaves the remote node.
  -     * 
  -     * @throws IOException Indicates an I/O problem.
  -     * @throws CommunicationException If a communication can not be established.
        */
  -    public void leave() throws IOException, CommunicationException;
  +    public void leave();
   
       /**
  -     * Adds a connection.
  -     * 
  -     * @param aConnection Connection to be added to the RemoteNode.
  -     */
  -    public void addConnection(RemoteNodeConnection aConnection);
  -    
  -    /**
  -     * Removes a connection.
  +     * Joins the remote node.
        * 
  -     * @param aConnection Connection to be removed.
  +     * @exception NodeException Indicates that the remote node can not be 
  +     * joined.
        */
  -    public void removeConnection(RemoteNodeConnection aConnection);
  +    public void join() throws NodeException;
       
   }
  
  
  
  1.2       +20 -9     incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNodeConnection.java
  
  Index: RemoteNodeConnection.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNodeConnection.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- RemoteNodeConnection.java	11 May 2004 12:06:42 -0000	1.1
  +++ RemoteNodeConnection.java	20 Jul 2004 00:15:06 -0000	1.2
  @@ -17,10 +17,9 @@
   
   package org.apache.geronimo.messaging.remotenode;
   
  -import java.io.IOException;
   
  -import org.apache.geronimo.messaging.CommunicationException;
   import org.apache.geronimo.messaging.MsgConsProd;
  +import org.apache.geronimo.messaging.NodeException;
   
   /**
    * Connection to a remote node.
  @@ -34,17 +33,29 @@
       /**
        * Opens the connection.
        * 
  -     * @throws IOException Indicates an I/O problem.
  -     * @throws CommunicationException If a communication can not be established.
  +     * @throws NodeException Indicates that the connection can not be opened.
        */
  -    public void open() throws IOException, CommunicationException;
  +    public void open() throws NodeException;
       
       /**
        * Closes the connection.
  +     */
  +    public void close();
  +    
  +    /**
  +     * Sets the listener to be notified when the connection is closed.
        * 
  -     * @throws IOException Indicates an I/O problem.
  -     * @throws CommunicationException If a communication can not be established.
  +     * @param aListener Listener.
        */
  -    public void close() throws IOException, CommunicationException;
  +    public void setLifecycleListener(LifecycleListener aListener);
  +    
  +    /**
  +     * Callback interface to be notified when the connection is closed.
  +     */
  +    public interface LifecycleListener {
  +
  +        public void onClose();
  +        
  +    }
       
   }
  
  
  
  1.4       +2 -7      incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNodeMonitor.java
  
  Index: RemoteNodeMonitor.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNodeMonitor.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- RemoteNodeMonitor.java	17 Jul 2004 03:48:57 -0000	1.3
  +++ RemoteNodeMonitor.java	20 Jul 2004 00:15:06 -0000	1.4
  @@ -24,7 +24,6 @@
   
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
  -import org.apache.geronimo.messaging.NodeException;
   import org.apache.geronimo.messaging.NodeInfo;
   import org.apache.geronimo.pool.ClockPool;
   
  @@ -144,11 +143,7 @@
                   Long lastActivity = (Long)entry.getValue();
                   if ( lastActivity.longValue() <
                       System.currentTimeMillis() + IDLE_TIME ) {
  -                    try {
  -                        manager.leaveRemoteNode(node);
  -                    } catch (NodeException e) {
  -                        log.error("Can not leave " + node, e);
  -                    }
  +                    manager.leaveRemoteNode(node);
                       iter.remove();
                   }
               }
  
  
  
  1.2       +9 -12     incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/NodeServer.java
  
  Index: NodeServer.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/NodeServer.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- NodeServer.java	11 May 2004 12:06:42 -0000	1.1
  +++ NodeServer.java	20 Jul 2004 00:15:06 -0000	1.2
  @@ -17,9 +17,8 @@
   
   package org.apache.geronimo.messaging.remotenode;
   
  -import java.io.IOException;
   
  -import org.apache.geronimo.messaging.CommunicationException;
  +import org.apache.geronimo.messaging.NodeException;
   
   /**
    * A NodeServer listens for remote nodes and delegates to a
  @@ -33,22 +32,20 @@
       /**
        * Start the server.
        * 
  -     * @throws IOException Indicates an I/O problem.
  -     * @throws CommunicationException If a communication can not be established.
  +     * @throws NodeException If the server can not be started.
  +     * @exception IllegalStateException Indicates that no RemoteNodeManger has
  +     * been set.
        */
  -    public void start() throws IOException, CommunicationException;
  +    public void start() throws NodeException, IllegalStateException;
   
       /**
        * Stop the server.
  -     * 
  -     * @throws IOException Indicates an I/O problem.
  -     * @throws CommunicationException If a communication can not be established.
        */
  -    public void stop() throws IOException, CommunicationException;
  +    public void stop();
   
       /**
  -     * Sets the RemoteNodeManager in charge of managing the remote node, which
  -     * have join this server.
  +     * Sets the RemoteNodeManager in charge of managing the remote nodes, which
  +     * have joined this server.
        * <BR>
        * A NodeServer must notify this RemoteNodeManager when a new connection
        * abstracting a remote note has joined it.
  
  
  
  1.8       +100 -113  incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNodeManagerImpl.java
  
  Index: RemoteNodeManagerImpl.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNodeManagerImpl.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- RemoteNodeManagerImpl.java	17 Jul 2004 03:49:29 -0000	1.7
  +++ RemoteNodeManagerImpl.java	20 Jul 2004 00:15:06 -0000	1.8
  @@ -17,12 +17,10 @@
   
   package org.apache.geronimo.messaging.remotenode;
   
  -import java.io.IOException;
   import java.util.ArrayList;
   import java.util.Collection;
   import java.util.Collections;
   import java.util.HashMap;
  -import java.util.HashSet;
   import java.util.Iterator;
   import java.util.Map;
   import java.util.Set;
  @@ -38,7 +36,6 @@
   import org.apache.geronimo.messaging.NodeTopology;
   import org.apache.geronimo.messaging.interceptors.MsgOutInterceptor;
   import org.apache.geronimo.messaging.io.IOContext;
  -import org.apache.geronimo.messaging.remotenode.admin.JoinRequest;
   import org.apache.geronimo.pool.ClockPool;
   
   /**
  @@ -67,6 +64,11 @@
        */
       private NodeTopology topology;
       
  +    /**
  +     * Topology being prepared.
  +     */
  +    private NodeTopology preparedTopology;
  +    
       public RemoteNodeManagerImpl(NodeInfo aNodeInfo, IOContext anIOContext,
           ClockPool aClockPool, MessagingTransportFactory aFactory) {
           if ( null == aNodeInfo ) {
  @@ -90,100 +92,81 @@
   
       public void start() throws NodeException {
           log.info("Starting RemoteNodeManager for node {" + nodeInfo + "}");
  -        try {
  -            server = factory.factoryServer(nodeInfo, ioContext);
  -            server.setRemoteNodeManager(this);
  -            server.start();
  -        } catch (IOException e) {
  -            throw new NodeException("Can not start server.", e);
  -        } catch (CommunicationException e) {
  -            throw new NodeException("Can not start server.", e);
  -        }
  +        server = factory.factoryServer(nodeInfo, ioContext);
  +        server.setRemoteNodeManager(this);
  +        server.start();
           remoteNodeMonitor.start();
       }
       
       public void stop() throws NodeException {
           log.info("Stopping RemoteNodeManager for node {" + nodeInfo + "}");
           remoteNodeMonitor.stop();
  +        server.stop();
  +        Collection nodes;
           synchronized(remoteNodes) {
  -            for (Iterator iter = remoteNodes.values().iterator(); iter.hasNext();) {
  -                RemoteNode node = (RemoteNode) iter.next();
  -                try {
  -                    node.leave();
  -                } catch (IOException e) {
  -                    log.error(e);
  -                } catch (CommunicationException e) {
  -                    log.error(e);
  -                } finally {
  -                    node.setMsgProducerOut(null);
  -                    iter.remove();
  -                }
  -            }
  +            nodes = new ArrayList(remoteNodes.values());
           }
  -        try {
  -            server.stop();
  -        } catch (IOException e) {
  -            throw new NodeException("Can not stop NodeServer.", e);
  -        } catch (CommunicationException e) {
  -            throw new NodeException("Can not stop NodeServer.", e);
  +        for (Iterator iter = nodes.iterator(); iter.hasNext();) {
  +            RemoteNode node = (RemoteNode) iter.next();
  +            node.leave();
  +            node.setMsgProducerOut(null);
           }
       }
   
  -    public void setTopology(NodeTopology aTopology) {
  -        Set neighbours = aTopology.getNeighbours(nodeInfo);
  -        
  -        // Makes sure that one does not try to remove the new neighbours 
  -        // during the reconfiguration.
  -        remoteNodeMonitor.unscheduleNodeDeletion(neighbours);
  -
  -        Set newNeighbours = new HashSet();
  +    public void prepareTopology(NodeTopology aTopology) throws NodeException {
           Set oldNeighbours;
           if ( null == topology ) {
               oldNeighbours = Collections.EMPTY_SET;
           } else {
               oldNeighbours = topology.getNeighbours(nodeInfo);
           }
  -        // Tries to join all the neighbours declared by the specified
  -        // topology.
  -        for (Iterator iter = neighbours.iterator(); iter.hasNext();) {
  +        // Computes the new neighbours
  +        Set newNeighbours = aTopology.getNeighbours(nodeInfo);
  +        newNeighbours.removeAll(oldNeighbours);
  +
  +        // Makes sure that one does not drop them during the reconfiguration.
  +        remoteNodeMonitor.unscheduleNodeDeletion(newNeighbours);
  +
  +        Exception exception = null;
  +        // Joins all the new neighbours
  +        for (Iterator iter = newNeighbours.iterator(); iter.hasNext();) {
               NodeInfo node = (NodeInfo) iter.next();
  -            if ( !oldNeighbours.contains(node) ) {
  -                try {
  -                    findOrJoinRemoteNode(node);
  -                    newNeighbours.add(node);
  -                } catch (NodeException e) {
  -                    log.error("Can not apply topology change", e);
  -                    break;
  -                } catch (CommunicationException e) {
  -                    log.error("Can not apply topology change", e);
  -                    break;
  -                }
  +            try {
  +                findOrJoinRemoteNode(node);
  +            } catch (NodeException e) {
  +                exception = e;
  +                break;
  +            } catch (CommunicationException e) {
  +                exception = e;
  +                break;
               }
  -            iter.remove();
  -            oldNeighbours.remove(node);
           }
  -        // One neighbour has not been joined successfully. Rolls-back the
  -        // physical connections created until now.
  -        if ( 0 < neighbours.size() ) {
  +        // One new neighbour has not been joined successfully. Rolls-back.
  +        if ( null != exception ) {
               for (Iterator iter = newNeighbours.iterator(); iter.hasNext();) {
                   NodeInfo node = (NodeInfo) iter.next();
  -                try {
  -                    leaveRemoteNode(node);
  -                } catch (NodeException e) {
  -                    log.error("Error rolling-back topology change", e);
  -                } catch (CommunicationException e) {
  -                    log.error("Error rolling-back topology change", e);
  -                }
  +                leaveRemoteNode(node);
               }
  -            throw new CommunicationException("Can not apply topology.");
  +            throw new NodeException("Can not apply topology.", exception);
  +        }
  +        preparedTopology = aTopology;
  +    }
  +    
  +    public void commitTopology() {
  +        Set oldNeighbours;
  +        if ( null == topology ) {
  +            oldNeighbours = Collections.EMPTY_SET;
  +        } else {
  +            oldNeighbours = topology.getNeighbours(nodeInfo);
           }
  +        // Computes the old neighbours
  +        Set newNeighbours = preparedTopology.getNeighbours(nodeInfo);
  +        oldNeighbours.removeAll(newNeighbours);
   
           // Schedules the deletion of the old neighbours.
           remoteNodeMonitor.scheduleNodeDeletion(oldNeighbours);
  -        // Ensures that the new neighbours will not be leaved.
  -        remoteNodeMonitor.unscheduleNodeDeletion(newNeighbours);
           
  -        topology = aTopology;
  +        topology = preparedTopology;
       }
       
       public void addListener(RemoteNodeEventListener aListener) {
  @@ -198,22 +181,14 @@
           }
       }
       
  -    public void leaveRemoteNode(NodeInfo aNodeInfo)
  -        throws NodeException {
  +    public void leaveRemoteNode(NodeInfo aNodeInfo) {
           synchronized(remoteNodes) {
  -            RemoteNode remoteNode = findRemoteNode(aNodeInfo);
  +            RemoteNode remoteNode = (RemoteNode) remoteNodes.get(aNodeInfo);
               if ( null == remoteNode ) {
                   return;
               }
  -            try {
  -                remoteNode.leave();
  -            } catch (IOException e) {
  -                throw new NodeException("Can not leave " + aNodeInfo, e);
  -            } catch (CommunicationException e) {
  -                throw new NodeException("Can not leave " + aNodeInfo, e);
  -            } finally {
  -                unregisterRemoteNode(remoteNode);
  -            }
  +            remoteNode.leave();
  +            unregisterRemoteNode(remoteNode);
           }
       }
       
  @@ -221,26 +196,14 @@
           throws NodeException {
           RemoteNode remoteNode;
           synchronized(remoteNodes) {
  -            remoteNode = findRemoteNode(aNodeInfo);
  +            remoteNode = (RemoteNode) remoteNodes.get(aNodeInfo);
               if ( null != remoteNode ) {
                   return remoteNode;
               }
  -            log.debug("Joining node {" + aNodeInfo + "}");
  -            remoteNode = factory.factoryRemoteNode(aNodeInfo, ioContext);
  -            RemoteNodeConnection connection;
  -            try {
  -                connection = remoteNode.newConnection();
  -                connection.open();
  -            } catch (IOException e) {
  -                throw new NodeException("Can not reach " + aNodeInfo, e);
  -            } catch (CommunicationException e) {
  -                throw new NodeException("Can not reach " + aNodeInfo, e);
  -            }
  -            JoinRequest joinRequest = new JoinRequest(nodeInfo, aNodeInfo);
  -            joinRequest.execute(connection);
  -
  -            remoteNode.addConnection(connection);
  -            registerRemoteNode(remoteNode);
  +            remoteNode =
  +                factory.factoryRemoteNode(nodeInfo, aNodeInfo, ioContext);
  +            remoteNode.setManager(this);
  +            remoteNode.join();
           }
           return remoteNode;
       }
  @@ -295,10 +258,6 @@
       private class RemoteNodeRouter implements MsgOutInterceptor {
   
           public void push(Msg aMsg) {
  -            if ( null == topology ) {
  -                throw new RuntimeException("No topology is set.");
  -            }
  -            
               MsgHeader header = aMsg.getHeader();
               Object destNode = header.getHeader(MsgHeaderConstants.DEST_NODES);
               if (destNode instanceof NodeInfo) {
  @@ -323,31 +282,59 @@
                           MsgHeaderConstants.DEST_NODE_PATH,
                           NodeInfo.pop(path));
                       RemoteNode remoteNode = findRemoteNode(target);
  +                    if ( null == remoteNode ) {
  +                        throw new CommunicationException(target +
  +                            " has failed during a topology reconfiguration.");
  +                    }
                       out = remoteNode.getMsgConsumerOut();
                   } else {
                       // A path has not already been computed. Computes one.
                       NodeInfo src = (NodeInfo)
                           header2.getHeader(MsgHeaderConstants.SRC_NODE);
  -                    path = topology.getPath(src, target);
  -                    if ( null == path ) {
  -                        throw new CommunicationException("{" + target +
  -                            "} is not reachable by {" + src + "}");
  +                    NodeTopology topo = markTopology(header2);
  +                    path = topo.getPath(src, target);
  +                    if (null == path) {
  +                        throw new CommunicationException("{" + target
  +                            + "} is not reachable by {" + src + 
  +                            "} in the topology " + topo);
                       }
  -                    NodeInfo tmpNode = path[0];
  -                    RemoteNode remoteNode = findRemoteNode(tmpNode);
  +                    RemoteNode remoteNode = findRemoteNode(path[0]);
                       if ( null == remoteNode ) {
  -                        throw new CommunicationException("{" + target +
  -                            "} is not reachable by {" + src + "}");
  +                        throw new CommunicationException(path[0] +
  +                            " has failed during a topology reconfiguration.");
                       }
                       out = remoteNode.getMsgConsumerOut();
                       
  -                    NodeInfo[] newPath = NodeInfo.pop(path);
                       // Inserts the computed path and the new dests.
  -                    header2.addHeader(MsgHeaderConstants.DEST_NODE_PATH, newPath);
  +                    header2.addHeader(MsgHeaderConstants.DEST_NODE_PATH, NodeInfo.pop(path));
                       header2.addHeader(MsgHeaderConstants.DEST_NODES, target);
                   }
                   out.push(msg2);
               }
  +        }
  +        
  +        /**
  +         * If the topology version is not set, then the Msg is sent in the
  +         * current topology.
  +         * <BR>
  +         * If it is set, then one checks that the associated topology is
  +         * still defined. It must be either the currently installed or the
  +         * one being prepared.
  +         */
  +        private NodeTopology markTopology(MsgHeader aHeader) {
  +            NodeTopology topo = topology;
  +            Integer version = (Integer)
  +                aHeader.getOptionalHeader(MsgHeaderConstants.TOPOLOGY_VERSION);
  +            if ( null == version ) {
  +                aHeader.addHeader(MsgHeaderConstants.TOPOLOGY_VERSION,
  +                    new Integer(topo.getVersion()));
  +            } else if ( version.intValue() == preparedTopology.getVersion() ) {
  +                topo = preparedTopology;
  +            } else if ( version.intValue() != topo.getVersion() ) {
  +                throw new CommunicationException("Topology version " +
  +                    version + " too old.");
  +            }
  +            return topo;
           }
           
       }
  
  
  
  1.4       +18 -9     incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNodeManager.java
  
  Index: RemoteNodeManager.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNodeManager.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- RemoteNodeManager.java	17 Jul 2004 03:49:29 -0000	1.3
  +++ RemoteNodeManager.java	20 Jul 2004 00:15:06 -0000	1.4
  @@ -48,16 +48,26 @@
       public void stop() throws NodeException;
       
       /**
  -     * Sets the Topology to be used to derive the path between two nodes.
  +     * Prepares the Topology to be used to derive the path between two nodes.
        * <BR>
  -     * When the topology is set, the manager tries to "apply" it: it creates
  -     * physical connections with all of its neighbours as defined by the
  -     * specified topology and drops the physical connections no more
  -     * required by the topology change.
  +     * When the topology is prepared, the manager tries to "apply" it: it 
  +     * creates physical connections to all of its neighbours as defined by the
  +     * specified topology. Physical connections no more required by the 
  +     * topology should not be dropped. These latter should be dropped only
  +     * if the topology is committed.
        * 
        * @param aTopology Topology.
  +     * @exception NodeException Indicates that the topology can not be prepared.
        */
  -    public void setTopology(NodeTopology aTopology);
  +    public void prepareTopology(NodeTopology aTopology) throws NodeException;
  +    
  +    /**
  +     * Commits the Topology which has been previously prepared.
  +     * <BR>
  +     * When a topology is committed, the physical connections defined
  +     * by the previous topology and not by the one to be committed are dropped.
  +     */
  +    public void commitTopology();
       
       /**
        * Adds a listener for RemoteNode event.
  @@ -77,9 +87,8 @@
        * Leaves a remote node. 
        * 
        * @param aNodeInfo Meta-data of the node to be left.
  -     * @throws NodeException
        */
  -    public void leaveRemoteNode(NodeInfo aNodeInfo) throws NodeException;
  +    public void leaveRemoteNode(NodeInfo aNodeInfo);
       
       /**
        * Finds or joins a remote node.
  
  
  
  1.3       +5 -15     incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/MessagingTransportFactory.java
  
  Index: MessagingTransportFactory.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/MessagingTransportFactory.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- MessagingTransportFactory.java	24 Jun 2004 23:39:03 -0000	1.2
  +++ MessagingTransportFactory.java	20 Jul 2004 00:15:06 -0000	1.3
  @@ -42,23 +42,13 @@
       /**
        * Creates a RemoteNode providing a local view of the remote node aNodeInfo.
        * 
  -     * @param aNodeInfo Remote node meta-data.
  +     * @param aLocalNodeInfo Local node meta-data.
  +     * @param aRemoteNodeInfo Remote node meta-data.
        * @param anIOContext Used to retrieve the IOContext to be used to
        * communicate with the remote node.
        * @return RemoteNode.
        */
  -    public RemoteNode factoryRemoteNode(
  -        NodeInfo aNodeInfo, IOContext anIOContext);
  +    public RemoteNode factoryRemoteNode(NodeInfo aLocalNodeInfo, 
  +        NodeInfo aRemoteNodeInfo, IOContext anIOContext);
   
  -    /**
  -     * Creates a RemoteNodeConnection to the remote node aNodeInfo.
  -     * 
  -     * @param aNodeInfo Remote node meta-data.
  -     * @param anIOContext Used to retrieve the IOContext to be used to
  -     * communicate with the remote node.
  -     * @return RemoteNodeConnection.
  -     */
  -    public RemoteNodeConnection factoryRemoteNodeConnection(
  -        NodeInfo aNodeInfo, IOContext anIOContext);
  -    
   }
  
  
  
  1.1                  incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/AbstractRemoteNode.java
  
  Index: AbstractRemoteNode.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.messaging.remotenode;
  
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.geronimo.messaging.MsgHeaderConstants;
  import org.apache.geronimo.messaging.NodeException;
  import org.apache.geronimo.messaging.NodeInfo;
  import org.apache.geronimo.messaging.interceptors.HeaderOutInterceptor;
  import org.apache.geronimo.messaging.interceptors.MsgOutInterceptor;
  import org.apache.geronimo.messaging.io.IOContext;
  import org.apache.geronimo.messaging.remotenode.RemoteNodeConnection.LifecycleListener;
  
  /**
   * Abstract implememtation for the RemoteNode contracts.
   *
   * @version $Revision: 1.1 $ $Date: 2004/07/20 00:15:06 $
   */
  public abstract class AbstractRemoteNode
      implements RemoteNode
  {
  
      private static final Log log = LogFactory.getLog(AbstractRemoteNode.class);
  
      /**
       * Local node meta-data.
       */
      protected final NodeInfo localNodeInfo;
      
      /**
       * Manager of this remote node.
       */
      protected RemoteNodeManager manager;
      
      /**
       * Remote node meta-data.
       */
      protected NodeInfo remoteNodeInfo;
  
      protected final IOContext ioContext;
  
      /**
       * Connection opened to this remote node.
       */
      private RemoteNodeConnection connection;
      
      /**
       * Incoming Msgs (coming from remote nodes) are pushed to this output.
       */
      protected MsgOutInterceptor producerOut; 
      
  
      public AbstractRemoteNode(NodeInfo aLocalNode, IOContext anIOContext) {
          if ( null == aLocalNode ) {
              throw new IllegalArgumentException("Local NodeInfo is required.");
          } else if ( null == anIOContext ) {
              throw new IllegalArgumentException("IOContext is required.");
          }
          localNodeInfo = aLocalNode;
          ioContext = anIOContext;
      }
      
      public AbstractRemoteNode(NodeInfo aLocalNodeInfo, NodeInfo aRemoteNodeInfo,
          IOContext anIOContext) {
          this(aLocalNodeInfo, anIOContext);
          if ( null == aRemoteNodeInfo ) {
              throw new IllegalArgumentException("Remote NodeInfo is required.");
          }
          remoteNodeInfo = aRemoteNodeInfo;
      }
      
      public void setManager(RemoteNodeManager aManager) {
          manager = aManager;
      }
      
      public void setMsgProducerOut(MsgOutInterceptor aMsgOut) {
          producerOut = aMsgOut;
          if ( null == connection ) {
              return;
          }
          connection.setMsgProducerOut(aMsgOut);
      }
  
      public MsgOutInterceptor getMsgConsumerOut() {
          return new HeaderOutInterceptor(
              MsgHeaderConstants.DEST_NODE,
              remoteNodeInfo,
              connection.getMsgConsumerOut());
      }
      
      public NodeInfo getNodeInfo() {
          return remoteNodeInfo;
      }
  
      protected void setConnection(RemoteNodeConnection aConnection)
          throws NodeException {
          if ( null != connection && null != aConnection ) {
              throw new IllegalArgumentException("Connection already defined.");
          } else if ( null != connection ) {
              connection.close();
              connection = null;
              return;
          }
          connection = aConnection;
          connection.open();
          connection.setMsgProducerOut(producerOut);
          connection.setLifecycleListener(new LifecycleListener() {
              public void onClose() {
                  manager.unregisterRemoteNode(AbstractRemoteNode.this);
              }
          });
      }
      
      public void leave() {
          connection.close();
          connection.setMsgProducerOut(null);
          connection = null;
      }
      
  }