You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by gd...@apache.org on 2004/03/01 14:16:36 UTC

cvs commit: incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote RemoteUseCaseTest.java

gdamour     2004/03/01 05:16:36

  Modified:    sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging
                        ServerNode.java ServerProcessors.java
               sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote
                        RemoteUseCaseTest.java
  Added:       sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging
                        CommunicationException.java NodeInfo.java
                        MetaConnection.java
  Log:
  Remove the ServantNode concept.
  
  Each node is a ServerNode, which can join another node uniquely
  identified on the network by its NodeInfo property.
  
  Revision  Changes    Path
  1.2       +76 -132   incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/ServerNode.java
  
  Index: ServerNode.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/ServerNode.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- ServerNode.java	25 Feb 2004 13:36:15 -0000	1.1
  +++ ServerNode.java	1 Mar 2004 13:16:35 -0000	1.2
  @@ -20,7 +20,6 @@
   import java.io.IOException;
   import java.io.InputStream;
   import java.io.OutputStream;
  -import java.net.InetAddress;
   import java.util.Collection;
   import java.util.HashMap;
   import java.util.Iterator;
  @@ -43,13 +42,10 @@
    * It is also in charge of dispatching the incoming Msgs to the registered
    * Connectors.
    * <BR>
  - * A ServantNode is the counterpart of a ServerNode: it allows to access a
  - * ServerNode remotely.
  - * <BR>
  - * The following diagram shows how ServerNode, ServantNode and Connector are
  - * combined together:
  + * The following diagram shows how ServantNode and Connectors are combined
  + * together:
    * 
  - * Connector -- MTO -- ServantNode -- MTO -- ServerNode -- OTM -- Connector
  + * Connector -- MTO -- ServerNode -- MTO -- ServerNode -- OTM -- Connector
    *
    * Connector communicates with each other by sending Msgs.
    *
  @@ -63,9 +59,9 @@
       private static final Log log = LogFactory.getLog(ServerNode.class);
   
       /**
  -     * Server name.
  +     * Node meta-data.
        */
  -    private final String name;
  +    private final NodeInfo nodeInfo;
       
       /**
        * Connectors registered by this server.
  @@ -104,7 +100,12 @@
       /**
        * Processors of this server.
        */
  -    private final ServerProcessors processors;
  +    final ServerProcessors processors;
  +    
  +    /**
  +     * MetaConnection to other nodes.
  +     */
  +    final MetaConnection metaConnection;
       
       private GBeanContext context;
   
  @@ -119,24 +120,22 @@
        * @param aMaxRequest Maximum number of concurrent requests, which can be
        * processed by this server.
        */
  -    public ServerNode(String aName, Collection aCollOfConnectors,
  -        InetAddress anAddress, int aPort, int aMaxRequest) {
  -        super(anAddress, aPort);
  -        if ( null == aName ) {
  -            throw new IllegalArgumentException("Name is required.");
  -        }
  +    public ServerNode(NodeInfo aNodeInfo, Collection aCollOfConnectors,
  +        int aMaxRequest) {
  +        super(aNodeInfo.getAddress(), aNodeInfo.getPort());
           
  -        name = aName;
  +        nodeInfo = aNodeInfo;
  +        metaConnection = new MetaConnection(this);
           
           // No socket timeout.
           setMaxIdleTimeMs(0);
           
  -        streamManager = new StreamManagerImpl(name);
  +        streamManager = new StreamManagerImpl(getName());
           
           processors = new ServerProcessors(this);
           
  -        queueIn = new MsgQueue(aName + " Inbound");
  -        queueOut = new MsgQueue(aName + " Outbound");
  +        queueIn = new MsgQueue(getName() + " Inbound");
  +        queueOut = new MsgQueue(getName() + " Outbound");
           
           connections = new HashMap();
           
  @@ -166,6 +165,48 @@
       }
   
       /**
  +     * Gets the name of this node.
  +     */
  +    public String getName() {
  +        return nodeInfo.getName();
  +    }
  +    
  +    /**
  +     * Gets the NodeInfo of this node.
  +     * 
  +     * @return NodeInfo.
  +     */
  +    public NodeInfo getNodeInfo() {
  +        return nodeInfo;
  +    }
  +    
  +    /**
  +     * Joins the node uniquely identified on the network by aNodeInfo.
  +     * 
  +     * @param aNodeInfo NodeInfo of a remote node to join.
  +     * @throws IOException Indicates that an I/O error has occured.
  +     * @throws CommunicationException Indicates that the node can not be
  +     * registered by the remote node identified by aNodeInfo.
  +     */
  +    public void join(NodeInfo aNodeInfo)
  +        throws IOException, CommunicationException {
  +        metaConnection.join(aNodeInfo);
  +    }
  +    
  +    /**
  +     * Leaves the node uniquely identified on the network by aNodeInfo.
  +     * 
  +     * @param aNodeInfo NodeInfo of the remote node to leave.
  +     * @throws IOException Indicates that an I/O error has occured.
  +     * @throws CommunicationException Indicates that the node has not leaved
  +     * successfully the remote node.
  +     */
  +    public void leave(NodeInfo aNodeInfo)
  +        throws IOException, CommunicationException {
  +        metaConnection.leave(aNodeInfo);
  +    }
  +    
  +    /**
        * Gets the StreamManager of this server.
        * 
        * @return StreamManager used by this server to resolve/encode InputStreams.
  @@ -175,17 +216,14 @@
       }
   
       /**
  -     * Gets the Output to be used to communicate with the specified servant.
  +     * Gets the Output to be used to communicate with the specified node.
        * 
  -     * @param aServantName Servant name.
  -     * @return Output to be used to communicate with the specified servant.
  +     * @param aServantName Node name.
  +     * @return Output to be used to communicate with the specified node.
        */
  -    public MsgOutInterceptor getOutForServant(Object aServantName) {
  -        ConnectionWrapper connection; 
  -        synchronized (connections) {
  -            connection = (ConnectionWrapper) connections.get(aServantName);
  -        }
  -        return connection.out;
  +    public MsgOutInterceptor getOutForNode(String aNodeName)
  +        throws CommunicationException {
  +        return metaConnection.getOutForNode(aNodeName);
       }
       
       /**
  @@ -226,19 +264,13 @@
        * Handles a new connection.
        */
       protected void handleConnection(InputStream anIn,OutputStream anOut) {
  -        ConnectionWrapper connection = initConnection(anIn, anOut);
  -        
  -        // Wait until the end of the connection.
  -        Object releaser = connection.endReleaser;
  -        synchronized (releaser) {
  -            try {
  -                releaser.wait();
  -            } catch (InterruptedException e) {
  -                log.error(e);
  -            }
  +        try {
  +            metaConnection.joined(anIn, anOut);
  +        } catch (IOException e) {
  +            log.error(e);
  +        } catch (CommunicationException e) {
  +            log.error(e);
           }
  -        
  -        removeConnection(connection);
       }
       
       public void setGBeanContext(GBeanContext aContext) {
  @@ -267,97 +299,9 @@
   
           processors.stop();
       }
  -
  -    /**
  -     * Initializes a connection. Checks that a connection with the same name
  -     * is not already registered by the server. If a connection with the same
  -     * name exists, then the server refuses the connection and exits. Otherwise,
  -     * a connection is registered with the provided name.
  -     * 
  -     * @param anIn Raw input of the connection.
  -     * @param anOut Raw output of the connection.
  -     * @return Connection.
  -     */
  -    private ConnectionWrapper initConnection(
  -        InputStream anIn, OutputStream anOut) {
  -        ConnectionWrapper connection = new ConnectionWrapper(anIn, anOut);
  -        
  -        Msg msg = connection.in.pop();
  -        MsgBody body = msg.getBody();
  -        String cName = (String) body.getContent();
  -        
  -        msg = new Msg();
  -        body = msg.getBody();
  -        synchronized (connections) {
  -            if ( connections.containsKey(cName) ) {
  -                body.setContent(Boolean.FALSE);
  -                connection.out.push(msg);
  -                throw new RuntimeException(cName + " already registered");
  -            }
  -            connection.nodeName = cName;
  -            addConnection(connection);
  -        }
  -        body.setContent(Boolean.TRUE);
  -        connection.out.push(msg);
  -        return connection;
  -    }
  -
  -    /**
  -     * Releases a connection.
  -     * 
  -     * @param aConnection Connection to be released.
  -     */
  -    private void removeConnection(ConnectionWrapper aConnection) {
  -        synchronized(connections) {
  -            connections.remove(aConnection.nodeName);
  -        }
  -        processors.stopConnection(aConnection);
  -        aConnection.close();
  -    }
  -
  -    /**
  -     * Registers a connection.
  -     * 
  -     * @param aConnection Connection to be registered.
  -     */
  -    private void addConnection(ConnectionWrapper aConnection) {
  -        synchronized(connections) {
  -            connections.put(aConnection.nodeName, aConnection);
  -            processors.startConnection(aConnection);
  -        }
  -    }
       
  -    class ConnectionWrapper {
  -        final MsgInInterceptor in;
  -        final InputStream rawIn;
  -        final MsgOutInterceptor out;
  -        final OutputStream rawOut;
  -        String nodeName;
  -        final Object endReleaser;
  -        private ConnectionWrapper(InputStream anIn, OutputStream anOut) {
  -            rawIn = anIn;
  -            in = new StreamInInterceptor(rawIn, streamManager);
  -            rawOut = anOut;
  -            out =
  -                new HeaderOutInterceptor(
  -                    MsgHeaderConstants.SRC_NODE,
  -                    name,
  -                    new StreamOutInterceptor(anOut, streamManager));
  -            endReleaser = new Object();
  -        }
  -        
  -        private void close() {
  -            try {
  -                rawIn.close();
  -            } catch (IOException e) {
  -                log.error("Can not close input", e);
  -            }
  -            try {
  -                rawOut.close();
  -            } catch (IOException e) {
  -                log.error("Can not close output", e);
  -            }
  -        }
  +    public String toString() {
  +        return "Node {" + nodeInfo + "}";
       }
   
   }
  
  
  
  1.2       +16 -62    incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/ServerProcessors.java
  
  Index: ServerProcessors.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/ServerProcessors.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- ServerProcessors.java	25 Feb 2004 13:36:15 -0000	1.1
  +++ ServerProcessors.java	1 Mar 2004 13:16:35 -0000	1.2
  @@ -56,32 +56,19 @@
           streamManager = aServer.getStreamManager();
       }
       
  -    public Processors getProcessors() {
  -        return processors;
  -    }
  -    
       /**
  -     * Starts popping Msg from the provided connection and adds them to the
  -     * inbound Msg queue.
  -     * 
  -     * @param aConnection Connection to be popped.
  +     * Execute a Processor in a separate Thread.
  +     *  
  +     * @param aProcessor Processor to be executed.
        */
  -    public void startConnection(ServerNode.ConnectionWrapper aConnection) {
  -        InboundQueueFiller filler = new InboundQueueFiller(aConnection);
  -        processors.execute(filler);
  +    public void execute(Processor aProcessor) {
  +        processors.execute(aProcessor);
       }
       
  -    /**
  -     * Stops popping Msg from the provided connection.
  -     * 
  -     * @param aConnection Connection to be stopped.
  -     */
  -    public void stopConnection(ServerNode.ConnectionWrapper aConnection) {
  -        Object releaser = aConnection.endReleaser;
  -        synchronized(releaser) {
  -            releaser.notify();
  -        }
  +    public Processors getProcessors() {
  +        return processors;
       }
  +    
       /**
        * Dispatches the Msgs seating in the inbound queue. Pushes the Msg seating
        * in the outbound queue to the relevant node.
  @@ -95,45 +82,6 @@
       }
       
       /**
  -     * Inbound queue filler.
  -     */
  -    private class InboundQueueFiller implements Processor {
  -
  -        /**
  -         * Connection to read Msg from.
  -         */
  -        private final ServerNode.ConnectionWrapper connection;
  -        
  -        /**
  -         * Is this Processor started.
  -         */
  -        private volatile boolean isStarted = true;
  -        
  -        /**
  -         * Pops Msgs from the specified connection and adds them to the
  -         * inbound queue.
  -         * 
  -         * @param aConnection Connection to read Msg from.
  -         */
  -        private InboundQueueFiller(ServerNode.ConnectionWrapper aConnection) {
  -            connection = aConnection;
  -        }
  -        
  -        public void run() {
  -            QueueOutInterceptor out = new QueueOutInterceptor(server.queueIn);
  -            while ( isStarted ) {
  -                Msg msg = connection.in.pop();
  -                out.push(msg);
  -            }
  -        }
  -
  -        public void release() {
  -            isStarted = false;
  -        }
  -        
  -    }
  -    
  -    /**
        * Runnable in charge of dispatching the Msgs seating in the outbound
        * queue to the relevant node. 
        */
  @@ -152,7 +100,13 @@
               while ( isStarted ) {
                   Msg msg = in.pop();
                   Object destNode = in.getHeader();
  -                MsgOutInterceptor out = server.getOutForServant(destNode);
  +                MsgOutInterceptor out;
  +                try {
  +                    out = server.getOutForNode((String) destNode);
  +                } catch (CommunicationException e) {
  +                    log.error(e);
  +                    continue;
  +                }
                   out.push(msg);
               }
           }
  
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/CommunicationException.java
  
  Index: CommunicationException.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.messaging;
  
  /**
   * Exception raised in case of a communication exceptions between two nodes.
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/01 13:16:35 $
   */
  public class CommunicationException extends Exception {
  
      public CommunicationException(String aMessage) {
          super(aMessage);
      }
      
      public CommunicationException(String aMessage, Throwable aNested) {
          super(aMessage, aNested);
      }
      
  }
  
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/NodeInfo.java
  
  Index: NodeInfo.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.messaging;
  
  import java.io.Serializable;
  import java.net.InetAddress;
  
  /**
   * Wraps the properties of a node, which identify it uniquely on the network.
   * <BR>
   * This class could be wrapped in a packet and send to a multicast group in
   * order to notify the availability of a new node to other nodes. These other
   * nodes could then decide to join it or not.
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/01 13:16:35 $
   */
  public class NodeInfo implements Serializable
  {
  
      /**
       * Name.
       */
      private final String name;
      /**
       * Listening address.
       */
      private final InetAddress address;
      /**
       * Listening port.
       */
      private final int port;
      
      /**
       * Creates a NodeInfo defining uniquely a node on a network.
       * 
       * @param aName Name of the node.
       * @param anAddess Address that the node is listening on.
       * @param aPort Listening port.
       */
      public NodeInfo(String aName, InetAddress anAddess, int aPort) {
          if ( null == aName ) {
              throw new IllegalArgumentException("Name is required.");
          } else if ( null == anAddess ) {
              throw new IllegalArgumentException("Address is required.");
          } else if ( 0 == aPort ) {
              throw new IllegalArgumentException("Port is required.");
          }
          name = aName;
          address = anAddess;
          port = aPort;
      }
      
      /**
       * Gets the listening address of the node providing this instance. 
       * 
       * @return Listening address.
       */
      public InetAddress getAddress() {
          return address;
      }
  
      /**
       * Gets the name of the node providing this instance.
       * 
       * @return Node name.
       */
      public String getName() {
          return name;
      }
  
      /**
       * Gets the listening port of the node providing this instance.
       * 
       * @return Listening port.
       */
      public int getPort() {
          return port;
      }
      
      public boolean equals(Object obj) {
          if ( false == obj instanceof NodeInfo ) {
              return false;
          }
          NodeInfo other = (NodeInfo) obj;
          return name.equals(other.name) && address.equals(other.address) &&
              port == other.port;
      }
      
      public String toString() {
          return "NodeInfo: node name = {" + name + "}; address = {" + address +
              "}; port = {" + port + "}";
      }
  
  }
  
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MetaConnection.java
  
  Index: MetaConnection.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.messaging;
  
  import java.io.IOException;
  import java.io.InputStream;
  import java.io.OutputStream;
  import java.net.Socket;
  import java.util.HashMap;
  import java.util.Iterator;
  import java.util.Map;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  
  /**
   * This is a connection of connections.
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/01 13:16:35 $
   */
  public class MetaConnection
  {
  
      private static final Log log = LogFactory.getLog(MetaConnection.class);
      
      /**
       * Node owning this connection.
       */
      private ServerNode node;
      
      /**
       * NodeInfo to Connection map.
       */
      private Map connections;
      
      /**
       * Creates a meta-connection for the specified node.
       * 
       * @param aNode Node.
       */
      public MetaConnection(ServerNode aNode) {
          if ( null == aNode ) {
              throw new IllegalArgumentException("Node is required.");
          }
          node = aNode;
          connections = new HashMap();
      }
      
      /**
       * Gets the Msg output to be used to communicate with the node aNodeName.
       * 
       * @param aNodeName Node name.
       * @return Msg output.
       * @throws CommunicationException Indicates that the node aNodeName is not
       * registered by this connection.
       */
      public MsgOutInterceptor getOutForNode(String aNodeName)
          throws CommunicationException {
          Map tmpConnections;
          synchronized(connections) {
              tmpConnections = new HashMap(connections);
          }
          Connection connection = null;
          for (Iterator iter = tmpConnections.entrySet().iterator();
              iter.hasNext();) {
              Map.Entry entry = (Map.Entry) iter.next();
              NodeInfo nodeInfo = (NodeInfo) entry.getKey();
              if ( nodeInfo.getName().equals(aNodeName) ) {
                  connection = (Connection) entry.getValue();
                  break;
              }
          }
          
          if ( null == connection ) {
              throw new CommunicationException("Node {" + aNodeName +
                  "} is not know by {" + node.getName() + "}");
          }
          
          return connection.out;
      }
      
      /**
       * Tests if the specified NodeInfo is already registered by this
       * meta-connection.
       * 
       * @param aNodeInfo NodeInfo defining a Node.
       * @return true if the node info is already registered.
       */
      public boolean isRegistered(NodeInfo aNodeInfo) {
          synchronized(connections) {
              return connections.containsKey(aNodeInfo);
          }
      }
      
      /**
       * Creates a new connection on top of the provided input and output streams
       * and waits for the end or failure of this connection prior to return.
       * <BR>
       * These streams should have been provided by a remote node, which is
       * joining the node owning this meta-connection.
       * <BR>
       * This method reads the NodeInfo of the remote node and tries to register
       * it with this node.  
       * 
       * @param anIn InputStream opened by a remote node on the node owning this
       * meta-connection.
       * @param anOut OutputStream.
       * @throws IOException Indicates that an I/O error has occured.
       * @throws CommunicationException Indicates that the NodeInfo provided by
       * the remote node conflicts with the current NodeInfo registrations.
       */
      public void joined(InputStream anIn, OutputStream anOut)
          throws IOException, CommunicationException {
          Connection connection = new Connection(anIn, anOut);
  
          // Try to register the connected node with this node.
          Msg msg = connection.in.pop();
          MsgBody body = msg.getBody();
          NodeInfo otherNodeInfo = (NodeInfo) body.getContent();
          
          msg = new Msg();
          body = msg.getBody();
          if ( isRegistered(otherNodeInfo) ) {
              body.setContent(Boolean.FALSE);
              connection.out.push(msg);
              throw new CommunicationException(
                  otherNodeInfo + " already registered");
          }
          synchronized(connections) {
              connections.put(otherNodeInfo, connection);
          }
          body.setContent(Boolean.TRUE);
          connection.out.push(msg);
          // Pops the input stream of the connection and fills in the inbound
          // Msg queue.
          QueueOutInterceptor out = new QueueOutInterceptor(node.queueIn);
          MsgCopier copier = new MsgCopier(
              connection.in, out, connection.listener);
          node.processors.execute(copier);
          connection.waitForEnd();
      }
      
      /**
       * Creates a new connection to the node uniquely identified on the network
       * by the provided NodeInfo.
       * 
       * @param aNodeInfo NodeInfo of a node.
       * @throws IOException Indicates that an I/O error has occured.
       * @throws CommunicationException Indicates that the node owning this
       * meta-connection can not be registered by the remote node identified by
       * aNodeInfo.
       */
      public void join(NodeInfo aNodeInfo)
          throws IOException, CommunicationException {
          if ( isRegistered(aNodeInfo) ) {
              throw new IllegalArgumentException("{" + aNodeInfo +
                  "} is already registered by {" + node + "}");
          }
          
          Connection connection = new Connection(aNodeInfo);
          
          // Try to register this node with the other one.
          Msg msg = new Msg();
          MsgBody body = msg.getBody();
          body.setContent(node.getNodeInfo());
          connection.out.push(msg);
          
          msg = connection.in.pop();
          // In case of successful registration, the server returns true.
          Boolean success = (Boolean) msg.getBody().getContent();
          if ( !success.booleanValue() ) {
              throw new CommunicationException("Can not register Node {" +
                  node.getNodeInfo() + "} with {" + aNodeInfo + "}");
          }
          synchronized (connections) {
              connections.put(aNodeInfo, connection);
          }
          // Pops the input stream of the connection and fills in the inbound
          // Msg queue.
          QueueOutInterceptor out = new QueueOutInterceptor(node.queueIn);
          MsgCopier copier = new MsgCopier(
              connection.in, out, connection.listener);
          node.processors.execute(copier);
      }
      
      /**
       * Closes the connection to the node identified by aNodeInfo.
       * 
       * @param aNodeInfo NodeInfo of a remote node.
       * @throws IOException Indicates that an I/O error has occured.
       * @throws CommunicationException Indicates that the node owning this 
       * meta-connection has not leaved successfully the remote node. 
       */
      public void leave(NodeInfo aNodeInfo)
          throws IOException, CommunicationException {
          if ( isRegistered(aNodeInfo) ) {
              throw new IllegalArgumentException("{" + aNodeInfo +
                  "} is already registered by {" + node + "}");
          }
          
          Connection connection;
          synchronized (connections) {
              connection = (Connection) connections.remove(aNodeInfo);
          }
          connection.close();
      }
  
      /**
       * Logical connection.
       */
      private class Connection {
          /**
           * Allows reading from the connection in Msg mode.
           */
          private final MsgInInterceptor in;
          /**
           * Raw InputStream.
           */
          private final InputStream rawIn;
          /**
           * Allows writing to the connection in Msg mode.
           */
          private final MsgOutInterceptor out;
          /**
           * Raw OutputStream.
           */
          private final OutputStream rawOut;
          /**
           * Receives notification when the copier poping and pushing Msgs to
           * the raw InputStream and OutputStream fails.
           */
          private final MsgCopier.CopierListener listener;
          /**
           * Monitor used to wait the end of this connection.
           */
          private final Object endReleaser = new Object();
          
          /**
           * Creates a connection wrapping the provided input and output streams.
           *  
           * @param anIn InputStream of the connection.
           * @param anOut OutputStream of the connection.
           * @exception IOException Indicates that an I/O error has occured.
           */
          private Connection(InputStream anIn, OutputStream anOut)
              throws IOException {
              if ( null == anIn ) {
                  throw new IllegalArgumentException("InputStream is required.");
              } else if ( null == anOut ) {
                  throw new IllegalArgumentException("OutputStream is required.");
              }
              
              rawIn = anIn;
              in = new StreamInInterceptor(rawIn, node.getStreamManager());
              rawOut = anOut;
              // One adds the name of this node on exit.
              out =
                  new HeaderOutInterceptor(
                      MsgHeaderConstants.SRC_NODE,
                      node.getName(),
                      new StreamOutInterceptor(rawOut, node.getStreamManager()));
              listener = new MsgCopier.NullCopierListener() {
                  public void onFailure() {
                      close();
                  }
              };
          }
          /**
           * Creates a connection to the node defined by aNodeInfo. 
           * 
           * @param aNodeInfo NodeInfo of a node.
           * @exception IOException Indicates that an I/O error has occured.
           */
          private Connection(NodeInfo aNodeInfo)
              throws IOException {
              if ( null == aNodeInfo ) {
                  throw new IllegalArgumentException("NodeInfo is required.");
              }
              Socket socket =
                  new Socket(aNodeInfo.getAddress(), aNodeInfo.getPort());
              
              rawIn = socket.getInputStream();
              in = new StreamInInterceptor(rawIn, node.getStreamManager());
              rawOut = socket.getOutputStream();
              // One adds the name of this node on exit.
              out =
                  new HeaderOutInterceptor(
                      MsgHeaderConstants.SRC_NODE,
                      node.getName(),
                      new StreamOutInterceptor(rawOut, node.getStreamManager()));
              listener = new MsgCopier.NullCopierListener() {
                  public void onFailure() {
                      close();
                  }
              };
          }
  
          /**
           * Close the logical connection.
           */
          private void close() {
              try {
                  rawIn.close();
              } catch (IOException e) {
                  log.error("Can not close input", e);
              }
              try {
                  rawOut.close();
              } catch (IOException e) {
                  log.error("Can not close output", e);
              }
              synchronized(endReleaser) {
                  endReleaser.notify();
              }
          }
          
          /**
           * Waits until the end of this connection.
           */
          private void waitForEnd() {
              synchronized(endReleaser) {
                  try {
                      endReleaser.wait();
                  } catch (InterruptedException e) {
                      log.error(e);
                  }
              }
          }
      }
      
  }
  
  
  
  1.2       +18 -10    incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/RemoteUseCaseTest.java
  
  Index: RemoteUseCaseTest.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/RemoteUseCaseTest.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- RemoteUseCaseTest.java	29 Feb 2004 13:14:11 -0000	1.1
  +++ RemoteUseCaseTest.java	1 Mar 2004 13:16:36 -0000	1.2
  @@ -29,7 +29,7 @@
   import org.apache.geronimo.datastore.impl.local.LocalGFileManager;
   import org.apache.geronimo.datastore.impl.remote.datastore.GFileManagerClient;
   import org.apache.geronimo.datastore.impl.remote.datastore.GFileManagerProxy;
  -import org.apache.geronimo.datastore.impl.remote.messaging.ServantNode;
  +import org.apache.geronimo.datastore.impl.remote.messaging.NodeInfo;
   import org.apache.geronimo.datastore.impl.remote.messaging.ServerNode;
   
   /**
  @@ -39,6 +39,11 @@
    */
   public class RemoteUseCaseTest extends AbstractUseCaseTest {
   
  +    /**
  +     * In this set-up one initializes two nodes, namely Node1 and Node2. A
  +     * local GFileManager is mounted by Node1. A client GFileManager is mounted
  +     * by Node2. Node2 joins Node1.
  +     */
       protected void setUp() throws Exception {
           LockManager lockManager = new LockManager();
           File root = new File(System.getProperty("java.io.tmpdir"),
  @@ -49,18 +54,21 @@
           GFileManager delegate;
           delegate = new LocalGFileManager("test", root, lockManager);
           InetAddress address = InetAddress.getLocalHost();
  -        int port = 8080;
           GFileManagerProxy proxy = new GFileManagerProxy(delegate);
  -        ServerNode server = new ServerNode("MasterNode",
  -                Collections.singleton(proxy), address, port, 2);
  -        server.doStart();
  +        NodeInfo nodeInfo1 = new NodeInfo("Node1", address, 8080);
  +        ServerNode server1 = new ServerNode(nodeInfo1,
  +            Collections.singleton(proxy), 10);
  +        server1.doStart();
           proxy.doStart();
   
  -        fileManager = new GFileManagerClient("test");
  -        ServantNode servant = new ServantNode(
  -                "ChildNode", Collections.singleton(fileManager), address, port, 10);
  -        servant.doStart();
  +        fileManager = new GFileManagerClient("test", "Node1");
  +        NodeInfo nodeInfo2 = new NodeInfo("Node2", address, 8082);
  +        ServerNode server2 = new ServerNode(nodeInfo2,
  +            Collections.singleton(fileManager), 10);
  +        server2.doStart();
           ((GFileManagerClient) fileManager).doStart();
  +        
  +        server2.join(nodeInfo1);
       }
       
   }