You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by gd...@apache.org on 2004/03/18 13:14:06 UTC

cvs commit: incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging MsgInterceptorStoppedException.java ServerNode.java RequestSender.java Topology.java StreamOutputStream.java StreamInputStream.java MsgQueue.java MsgCopier.java HeaderReactor.java Processors.java Processor.java ServerProcessors.java MetaConnection.java

gdamour     2004/03/18 04:14:06

  Modified:    sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging
                        DummyConnector.java
               sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging
                        ServerNode.java RequestSender.java Topology.java
                        StreamOutputStream.java StreamInputStream.java
                        MsgQueue.java MsgCopier.java HeaderReactor.java
                        Processors.java Processor.java
                        ServerProcessors.java MetaConnection.java
  Added:       sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging
                        MsgInterceptorStoppedException.java
  Log:
  o Partial implementation of doStop for a ServerNode.
  o ObjectStreamClass of instances sent across the wire are not
  systematically transfered. There are transfered the very first time
  along with an identifier. This latter is then subsequently provided.
  
  Revision  Changes    Path
  1.2       +11 -14    incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging/DummyConnector.java
  
  Index: DummyConnector.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging/DummyConnector.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- DummyConnector.java	11 Mar 2004 15:36:14 -0000	1.1
  +++ DummyConnector.java	18 Mar 2004 12:14:05 -0000	1.2
  @@ -75,10 +75,10 @@
       public void deliver(Msg aMsg) {
           MsgHeader header = aMsg.getHeader();
           MsgBody.Type bodyType =
  -        (MsgBody.Type) header.getHeader(MsgHeaderConstants.BODY_TYPE);
  -        if ( bodyType.equals(MsgBody.Type.REQUEST) ) {
  +            (MsgBody.Type) header.getHeader(MsgHeaderConstants.BODY_TYPE);
  +        if ( MsgBody.Type.REQUEST == bodyType ) {
               handleRequest(aMsg);
  -        } else if ( bodyType.equals(MsgBody.Type.RESPONSE) ) {
  +        } else if ( MsgBody.Type.RESPONSE == bodyType ) {
               handleResponse(aMsg);
           }
       }
  @@ -100,18 +100,15 @@
                   MsgHeaderConstants.CORRELATION_ID,
                   id,
                   new HeaderOutInterceptor(
  -                    MsgHeaderConstants.DEST_CONNECTOR,
  -                    getName(),
  +                    MsgHeaderConstants.BODY_TYPE,
  +                    MsgBody.Type.RESPONSE,
                       new HeaderOutInterceptor(
  -                        MsgHeaderConstants.BODY_TYPE,
  -                        MsgBody.Type.RESPONSE,
  +                        MsgHeaderConstants.DEST_NODES,
  +                        node,
                           new HeaderOutInterceptor(
  -                            MsgHeaderConstants.DEST_NODES,
  -                            node,
  -                            new HeaderOutInterceptor(
  -                                MsgHeaderConstants.BODY_TYPE,
  -                                MsgBody.Type.RESPONSE,
  -                                out)))));
  +                            MsgHeaderConstants.BODY_TYPE,
  +                            MsgBody.Type.RESPONSE,
  +                            out))));
           reqOut.push(msg);
       }
   
  
  
  
  1.5       +3 -1      incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/ServerNode.java
  
  Index: ServerNode.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/ServerNode.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- ServerNode.java	11 Mar 2004 15:36:14 -0000	1.4
  +++ ServerNode.java	18 Mar 2004 12:14:05 -0000	1.5
  @@ -301,11 +301,13 @@
   
       public void doStop() throws WaitingException, Exception {
           server.stop();
  +        metaConnection.stop();
           processors.stop();
       }
   
       public void doFail() {
           server.stop();
  +        metaConnection.stop();
           processors.stop();
       }
       
  
  
  
  1.5       +5 -2      incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/RequestSender.java
  
  Index: RequestSender.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/RequestSender.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- RequestSender.java	16 Mar 2004 14:48:59 -0000	1.4
  +++ RequestSender.java	18 Mar 2004 12:14:05 -0000	1.5
  @@ -197,7 +197,7 @@
       /**
        * Request identifier.
        */
  -    private static class RequestID implements Externalizable {
  +    public static class RequestID implements Externalizable {
           private int id;
           /**
            * Required for Externalization.
  @@ -205,6 +205,9 @@
           public RequestID() {}
           public RequestID(int anID) {
               id = anID;
  +        }
  +        public int getID() {
  +            return id;
           }
           public void writeExternal(ObjectOutput out) throws IOException {
               out.writeInt(id);
  
  
  
  1.3       +97 -23    incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/Topology.java
  
  Index: Topology.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/Topology.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- Topology.java	16 Mar 2004 14:48:59 -0000	1.2
  +++ Topology.java	18 Mar 2004 12:14:05 -0000	1.3
  @@ -35,7 +35,7 @@
    * Based on this knowledge, Topology is able to derive the shortest path - the
    * one having the lowest weight - between two nodes.
    * <BR>
  - * This class is intended to be send to a multicast group when a topology event
  + * This class is intended to be sent to a multicast group when a topology event
    * happens.
    *
    * @version $Revision$ $Date$
  @@ -49,28 +49,60 @@
        */
       private Map nodeToPaths;
   
  -    private Object nodeAndIdLock = new Object();
  -    private Map nodeToID;
  -    private Map idToNode;
  +    /**
  +     * Used to perform atomic operation on nodeToID and idToNode.
  +     */
  +    private final Object nodeAndIdLock = new Object();
       
  +    /**
  +     * NodeInfo to Integer (node identifier) mapping.
  +     */
  +    private final Map nodeToID;
  +    
  +    /**
  +     * Integer (node identifier) to NodeInfo mapping.
  +     */
  +    private final IndexedMap idToNode;
  +
  +    /**
  +     * TwoNodeInfo to NodeInfo[] mapping. It is a cache of the shortest paths
  +     * between two NodeInfos.
  +     */
  +    private final Map shortestPathCache;
  +
  +    /**
  +     * Used to generate NodeInfo identifiers.
  +     */
       private static int seqID = 0;
       
       public Topology() {
           nodeToPaths = new HashMap();
           nodeToID = new HashMap();
  -        idToNode = new HashMap();
  +        idToNode = new IndexedMap(100);
  +        shortestPathCache = new HashMap();
       }
   
  +    /**
  +     * Registers a node.
  +     * 
  +     * @param aNodeInfo Node to be registered.
  +     */
       private void registerNode(NodeInfo aNodeInfo) {
           synchronized(nodeAndIdLock) {
               if ( null == nodeToID.get(aNodeInfo) ) {
  -                Integer id = new Integer(seqID++);
  +                int intID = seqID++;
  +                Integer id = new Integer(intID);
                   nodeToID.put(aNodeInfo, id);
  -                idToNode.put(id, aNodeInfo);
  +                idToNode.put(intID, aNodeInfo);
               }
           }
       }
       
  +    /**
  +     * Unregisters a node.
  +     *
  +     * @param aNodeInfo Node to be unregistered.
  +     */
       private void unregisterNode(NodeInfo aNodeInfo) {
           synchronized(nodeAndIdLock) {
               if ( null == nodeToID.remove(aNodeInfo) ) {
  @@ -106,6 +138,9 @@
               }
               related.add(new NodeWrapper(aPath.nodeOne, aPath.weigthTwoToOne));
           }
  +        synchronized (shortestPathCache) {
  +            shortestPathCache.clear();
  +        }
       }
       
       /**
  @@ -125,6 +160,9 @@
               }
           }
           unregisterNode(aNode);
  +        synchronized (shortestPathCache) {
  +            shortestPathCache.clear();
  +        }
       }
       
       /**
  @@ -164,6 +202,9 @@
                   nodeToPaths.remove(aPath.nodeTwo);
               }
           }
  +        synchronized (shortestPathCache) {
  +            shortestPathCache.clear();
  +        }
       }
   
       /**
  @@ -176,6 +217,17 @@
        * returned if these two nodes are not connected.
        */
       public NodeInfo[] getPath(NodeInfo aSource, NodeInfo aTarget) {
  +        // First check if the path has already been computed.
  +        TwoNodeInfo twoNodeInfo = new TwoNodeInfo(aSource, aTarget);
  +        NodeInfo[] result;
  +        synchronized(shortestPathCache) {
  +            result = (NodeInfo[]) shortestPathCache.get(twoNodeInfo);
  +        }
  +        if ( null != result ) {
  +            return result;
  +        }
  +        
  +        // This is the first time that this path needs to be computed.
           Map tmpNodeToRelated;
           synchronized(nodeToPaths) {
               tmpNodeToRelated = new HashMap(nodeToPaths);
  @@ -189,6 +241,7 @@
           int minWeight = -1;
           int minPathIndex = 0;
           int index = 0;
  +        // Gets the shortest path amongst the available paths.
           for (Iterator iter = paths.iterator(); iter.hasNext();index++) {
               int weight = 0;
               List nodeList = (List) iter.next();
  @@ -202,12 +255,15 @@
               }
           }
           List path = (List) paths.get(minPathIndex);
  -        NodeInfo[] result = new NodeInfo[path.size()];
  +        result = new NodeInfo[path.size()];
           int i = 0;
           for (Iterator iter = path.iterator(); iter.hasNext();) {
               NodeWrapper wrapper = (NodeWrapper) iter.next();
               result[i++] = wrapper.node;
           }
  +        synchronized(shortestPathCache) {
  +            shortestPathCache.put(twoNodeInfo, result);
  +        }
           return result;
       }
   
  @@ -257,11 +313,14 @@
           return id.intValue();
       }
       
  +    /**
  +     * Gets the NodeInfo having the specified identifier.
  +     * 
  +     * @param anId Node identifier.
  +     * @return NodeInfo having this identifier.
  +     */
       public NodeInfo getNodeById(int anId) {
  -        NodeInfo nodeInfo;
  -        synchronized(nodeAndIdLock) {
  -            nodeInfo = (NodeInfo) idToNode.get(new Integer(anId));
  -        }
  +        NodeInfo nodeInfo = (NodeInfo) idToNode.get(anId);
           if ( null == nodeInfo ) {
               throw new IllegalArgumentException("Identifier " + anId +
                   " is not registered by this topology.");
  @@ -302,7 +361,10 @@
           }
       }
       
  -    private static class NodeWrapper implements Externalizable {
  +    /**
  +     * NodeInfo wrapper. It allows storing a weight with a NodeInfo.
  +     */
  +    private static class NodeWrapper {
           private PathWeight weigth;
           private NodeInfo node;
           public NodeWrapper(NodeInfo aNode, PathWeight aWeight) {
  @@ -319,14 +381,6 @@
           public int hashCode() {
               return node.hashCode();
           }
  -        public void writeExternal(ObjectOutput out) throws IOException {
  -            out.writeObject(weigth);
  -            out.writeObject(node);
  -        }
  -        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
  -            weigth = (PathWeight) in.readObject();
  -            node = (NodeInfo) in.readObject();
  -        }
       }
       
       /**
  @@ -380,6 +434,26 @@
               weigthTwoToOne = (PathWeight) in.readObject();
               nodeOne = (NodeInfo) in.readObject();
               nodeTwo = (NodeInfo) in.readObject();
  +        }
  +    }
  +
  +    private static class TwoNodeInfo {
  +        private final NodeInfo nodeOne;
  +        private final NodeInfo nodeTwo;
  +        private TwoNodeInfo(NodeInfo aNodeOne, NodeInfo aNodeTwo) {
  +            nodeOne = aNodeOne;
  +            nodeTwo = aNodeTwo;
  +        }
  +        public int hashCode() {
  +            return nodeOne.hashCode() * nodeTwo.hashCode();
  +        }
  +        public boolean equals(Object obj) {
  +            if ( false == obj instanceof TwoNodeInfo ) {
  +                return false;
  +            }
  +            TwoNodeInfo other = (TwoNodeInfo) obj;
  +            return nodeOne.equals(other.nodeOne) &&
  +                nodeTwo.equals(other.nodeTwo);
           }
       }
       
  
  
  
  1.5       +62 -1     incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamOutputStream.java
  
  Index: StreamOutputStream.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamOutputStream.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- StreamOutputStream.java	16 Mar 2004 14:48:59 -0000	1.4
  +++ StreamOutputStream.java	18 Mar 2004 12:14:05 -0000	1.5
  @@ -20,7 +20,10 @@
   import java.io.IOException;
   import java.io.InputStream;
   import java.io.ObjectOutputStream;
  +import java.io.ObjectStreamClass;
   import java.io.OutputStream;
  +import java.util.HashMap;
  +import java.util.Map;
   
   /**
    * This is the counterpart of StreamInputStream.
  @@ -31,7 +34,40 @@
       extends ObjectOutputStream
   {
   
  +    /**
  +     * First byte written by writeClassDescriptor to indicate that the
  +     * ObjectStreamClass to be written is new.
  +     * <BR>
  +     * StreamOutputStream writes just after this byte an int, which will be
  +     * used subsequently to send the same ObjectStreamClass (having the same
  +     * SUID).
  +     * <BR>
  +     * After this identifier, the actual ObjectStreamClass is written.
  +     */
  +    public static final byte NOT_CACHED = 0x01;
  +    
  +    /**
  +     * First byte written by writeClassDescriptor to indicate that the
  +     * ObjectStreamClass to be written has already been provided.
  +     * <BR>
  +     * StreamOutputStream writes just after this byte an int, which is the 
  +     * identifier of the ObjectStreamClass being written.\
  +     * <BR>
  +     * See NOT_CACHED for more details.
  +     */
  +    public static final byte CACHED = 0x02;
  +
  +    /**
  +     * Used to generate identifiers for cached ObjectStreamClasses.
  +     */
  +    private int seqID;
  +    
       private final StreamManager streamManager;
  +
  +    /**
  +     * ClassDescriptors to Integer map.
  +     */
  +    private final Map classDescCache;
       
       public StreamOutputStream(OutputStream anOut, StreamManager aManager)
           throws IOException {
  @@ -41,6 +77,8 @@
               throw new IllegalArgumentException("StreamManager is required.");
           }
           streamManager = aManager;
  +        classDescCache = new HashMap();
  +        seqID = 0;
       }
   
       public void writeStream(InputStream aStream) throws IOException {
  @@ -61,6 +99,29 @@
        * It is critical to avoid to write 
        */
       protected void writeStreamHeader() throws IOException {}
  +    
  +    /**
  +     * ObjectStreamClasses are not systematically written to the stream.
  +     * Instead, the very first time that a given ObjectStreamClass needs to be
  +     * written, this implementation assigns it an identifier.
  +     * <BR>
  +     * This latter will be written for all the remaining requests.
  +     */
  +    protected void writeClassDescriptor(ObjectStreamClass desc)
  +        throws IOException {
  +        Long descKey = new Long(desc.getSerialVersionUID()); 
  +        Integer id = (Integer) classDescCache.get(descKey);
  +        if ( null == id ) {
  +            id = new Integer(seqID++);
  +            classDescCache.put(descKey, id);
  +            write(NOT_CACHED);
  +            writeInt(id.intValue());
  +            super.writeClassDescriptor(desc);
  +        } else {
  +            write(CACHED);
  +            writeInt(id.intValue());
  +        }
  +    }
       
       protected Object replaceObject(Object obj) throws IOException {
           if ( obj instanceof InputStream ) {
  
  
  
  1.5       +33 -1     incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamInputStream.java
  
  Index: StreamInputStream.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamInputStream.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- StreamInputStream.java	16 Mar 2004 14:48:59 -0000	1.4
  +++ StreamInputStream.java	18 Mar 2004 12:14:05 -0000	1.5
  @@ -20,7 +20,10 @@
   import java.io.IOException;
   import java.io.InputStream;
   import java.io.ObjectInputStream;
  +import java.io.ObjectStreamClass;
   import java.io.StreamCorruptedException;
  +import java.util.ArrayList;
  +import java.util.List;
   
   /**
    * Allows to read an InputStream from the underlying InputStream. More
  @@ -35,6 +38,11 @@
   {
   
       /**
  +     * ClassDescriptors cache.
  +     */
  +    private final List classDescCache;
  +    
  +    /**
        * StreamManager to be used to retrieve InputStream encoded in the
        * underlying InputStream.
        */
  @@ -56,6 +64,7 @@
               throw new IllegalArgumentException("StreamManager is required.");
           }
           streamManager = aManager;
  +        classDescCache = new ArrayList();
       }
   
       /**
  @@ -102,6 +111,29 @@
        */
       protected void readStreamHeader()
           throws IOException, StreamCorruptedException {}
  +    
  +    /**
  +     * Counterpart of StreamOutputStream.writeClassDescriptor.
  +     */
  +    protected ObjectStreamClass readClassDescriptor()
  +        throws IOException, ClassNotFoundException {
  +        ObjectStreamClass streamClass;
  +        int type = read();
  +        if ( StreamOutputStream.NOT_CACHED == type ) {
  +            int id = readInt();
  +            streamClass = super.readClassDescriptor();
  +            classDescCache.add(id, streamClass);
  +        } else {
  +            int id = readInt();
  +            try {
  +                streamClass = (ObjectStreamClass) classDescCache.get(id);
  +            } catch (IndexOutOfBoundsException e) {
  +                throw new StreamCorruptedException(
  +                    "No ObjectStreamClass has the identifier {" + id + "}");
  +            }
  +        }
  +        return streamClass;
  +    }
       
       protected Object resolveObject(Object obj) throws IOException {
           if ( obj instanceof GInputStream ) {
  
  
  
  1.4       +3 -3      incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MsgQueue.java
  
  Index: MsgQueue.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MsgQueue.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- MsgQueue.java	11 Mar 2004 15:36:14 -0000	1.3
  +++ MsgQueue.java	18 Mar 2004 12:14:05 -0000	1.4
  @@ -68,7 +68,7 @@
               queue.put(aMessage);
           } catch (InterruptedException e) {
               log.error(e);
  -            throw new RuntimeException(e);
  +            throw new MsgInterceptorStoppedException(e);
           }
           log.trace("Message added to queue {" + name + "}");
       }
  @@ -84,7 +84,7 @@
               message = (Msg) queue.take();
           } catch (InterruptedException e) {
               log.error(e);
  -            throw new RuntimeException(e);
  +            throw new MsgInterceptorStoppedException(e);
           }
           log.trace("Message removed from queue {" + name + "}");
           return message;
  
  
  
  1.2       +2 -12     incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MsgCopier.java
  
  Index: MsgCopier.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MsgCopier.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- MsgCopier.java	25 Feb 2004 13:36:15 -0000	1.1
  +++ MsgCopier.java	18 Mar 2004 12:14:05 -0000	1.2
  @@ -54,11 +54,6 @@
       private final CopierListener listener;
       
       /**
  -     * Indicates if this Processor is started. 
  -     */
  -    private volatile boolean isStarted;
  -    
  -    /**
        * Creates a copier which copies anIn to anOut and notifies aListener.
        * 
        * @param anIn Source.
  @@ -91,13 +86,12 @@
           } else {
               listener = aListener;
           }
  -        isStarted = true;
       }
   
       public void run() {
           listener.onStart();
           try {
  -            while ( isStarted ) {
  +            while ( true ) {
                   copy();
               }
           } catch (Throwable e) {
  @@ -108,10 +102,6 @@
           }
       }
   
  -    public void release() {
  -        isStarted = false;
  -    }
  -    
       /**
        * Actual copy.
        */
  
  
  
  1.3       +10 -14    incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/HeaderReactor.java
  
  Index: HeaderReactor.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/HeaderReactor.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- HeaderReactor.java	3 Mar 2004 13:10:07 -0000	1.2
  +++ HeaderReactor.java	18 Mar 2004 12:14:05 -0000	1.3
  @@ -50,11 +50,6 @@
       private final Processors processors;
       
       /**
  -     * Indicates if this Processor is running.
  -     */
  -    private volatile boolean isStarted; 
  -    
  -    /**
        * Dispatch the provided Header blocks.
        * 
        * @param anIn Header blocks to be dispatched.
  @@ -70,7 +65,6 @@
           actualIn = anIn;
           connectors = new HashMap();
           processors = aProcessors;
  -        isStarted = true;
       }
   
       /**
  @@ -98,20 +92,22 @@
       }
       
       public void run() {
  -        while (isStarted) {
  -            dispatch();
  +        try {
  +            while ( true ) {
  +                dispatch();
  +            }
  +        } catch (MsgInterceptorStoppedException e) {
  +            log.info("Stopping HeaderReactor", e);
  +            return;
           }
       }
   
  -    public void release() {
  -        isStarted = false;
  -    }
  -    
       /**
        * Dispatches Msgs to the relevant Connector.
        */
       private void dispatch() {
  -        final Msg msg = actualIn.pop();
  +        final Msg msg;
  +        msg = actualIn.pop();
           Object opaque = actualIn.getHeader();
           final Connector connector;
           synchronized (connectors) {
  
  
  
  1.2       +9 -5      incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/Processors.java
  
  Index: Processors.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/Processors.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- Processors.java	25 Feb 2004 13:36:15 -0000	1.1
  +++ Processors.java	18 Mar 2004 12:14:05 -0000	1.2
  @@ -42,8 +42,6 @@
        */
       private final String name;
       
  -    private int idSeq;
  -    
       /**
        * Create a Processor pool.
        * 
  @@ -56,12 +54,11 @@
               throw new IllegalArgumentException("Name is required.");
           }
           executor = new PooledExecutor();
  -        idSeq = 0;
           name = aName;
           executor.setThreadFactory(
               new ThreadFactory() {
                   public Thread newThread(Runnable arg0) {
  -                    Thread thread = new Thread(arg0, name + "-" + idSeq);
  +                    Thread thread = new Thread(arg0, name);
                       thread.setDaemon(true);
                       return thread;
                   }
  @@ -83,6 +80,13 @@
           } catch (InterruptedException e) {
               log.error(e);
           }
  +    }
  +    
  +    /**
  +     * Interrupts the Processors currently executed by this instance. 
  +     */
  +    public void stop() {
  +        executor.interruptAll();
       }
       
   }
  
  
  
  1.2       +2 -8      incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/Processor.java
  
  Index: Processor.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/Processor.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- Processor.java	25 Feb 2004 13:36:15 -0000	1.1
  +++ Processor.java	18 Mar 2004 12:14:05 -0000	1.2
  @@ -18,16 +18,10 @@
   package org.apache.geronimo.datastore.impl.remote.messaging;
   
   /**
  - * Runnable task which can be released.
  + * Runnable task.
    *
    * @version $Revision$ $Date$
    */
   public interface Processor extends Runnable
   {
  -    
  -    /**
  -     * Releases the task.
  -     */
  -    public void release();
  -    
   }
  
  
  
  1.6       +10 -12    incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/ServerProcessors.java
  
  Index: ServerProcessors.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/ServerProcessors.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- ServerProcessors.java	11 Mar 2004 15:36:14 -0000	1.5
  +++ ServerProcessors.java	18 Mar 2004 12:14:05 -0000	1.6
  @@ -79,6 +79,7 @@
       }
       
       public void stop() {
  +        processors.stop();
       }
       
       /**
  @@ -87,18 +88,19 @@
        */
       private class OutputQueueDispatcher implements Processor {
           
  -        /**
  -         * Is this Processor started.
  -         */
  -        private volatile boolean isStarted = true;
  -        
           public void run() {
               HeaderInInterceptor in =
                   new HeaderInInterceptor(
                       new QueueInInterceptor(server.queueOut),
                       MsgHeaderConstants.DEST_NODES);
  -            while ( isStarted ) {
  -                Msg msg = in.pop();
  +            while ( true ) {
  +                Msg msg;
  +                try {
  +                    msg = in.pop();
  +                } catch (MsgInterceptorStoppedException e) {
  +                    log.info("Stopping OutputQueueDispatcher", e);
  +                    return;
  +                }
                   Object destNode = in.getHeader();
                   MsgOutInterceptor out;
                   if ( destNode instanceof NodeInfo ) {
  @@ -131,10 +133,6 @@
               }
           }
   
  -        public void release() {
  -            isStarted = false;
  -        }
  -        
       }
       
   }
  
  
  
  1.5       +71 -7     incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MetaConnection.java
  
  Index: MetaConnection.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MetaConnection.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- MetaConnection.java	16 Mar 2004 14:48:59 -0000	1.4
  +++ MetaConnection.java	18 Mar 2004 12:14:05 -0000	1.5
  @@ -21,7 +21,10 @@
   import java.io.InputStream;
   import java.io.OutputStream;
   import java.net.Socket;
  +import java.util.ArrayList;
  +import java.util.Collection;
   import java.util.HashMap;
  +import java.util.Iterator;
   import java.util.Map;
   
   import org.apache.commons.logging.Log;
  @@ -40,12 +43,17 @@
       /**
        * Node owning this connection.
        */
  -    private ServerNode node;
  +    private final ServerNode node;
       
       /**
        * NodeInfo to Connection map.
        */
  -    private Map connections;
  +    private final Map connections;
  +    
  +    /**
  +     * Connections opened by MetaConnection.
  +     */
  +    private final Collection openedConnections;
       
       /**
        * Node topology to be used to derive the most appropriate path to reach
  @@ -57,7 +65,7 @@
        * Logical compression and decompression applied on Msgs pushed and poped 
        * by this meta-connection.
        */
  -    private LogicalCompression logicalComp;
  +    private final LogicalCompression logicalComp;
       
       /**
        * Creates a meta-connection for the specified node.
  @@ -71,6 +79,7 @@
           node = aNode;
           connections = new HashMap();
           logicalComp = new LogicalCompression();
  +        openedConnections = new ArrayList();
       }
       
       /**
  @@ -204,6 +213,7 @@
               throw new CommunicationException(
                   otherNodeInfo + " already registered");
           }
  +        connection.nodeInfo = otherNodeInfo;
           synchronized(connections) {
               connections.put(otherNodeInfo, connection);
           }
  @@ -248,6 +258,9 @@
           synchronized (connections) {
               connections.put(aNodeInfo, connection);
           }
  +        synchronized (openedConnections) {
  +            openedConnections.add(connection);
  +        }
           popConnection(connection);
       }
       
  @@ -270,10 +283,29 @@
           synchronized (connections) {
               connection = (Connection) connections.remove(aNodeInfo);
           }
  +        synchronized (openedConnections) {
  +            openedConnections.remove(connection);
  +        }
           connection.close();
       }
   
       /**
  +     * Stops the connections. 
  +     */
  +    public void stop() {
  +        // TODO Remote nodes which have join the node owning this
  +        // meta-connection need to be notified in order to close on their side
  +        // the connection.
  +        synchronized(openedConnections) {
  +            for (Iterator iter = openedConnections.iterator(); iter.hasNext();) {
  +                Connection connection = (Connection) iter.next();
  +                connection.close();
  +                iter.remove();
  +            }
  +        }
  +    }
  +    
  +    /**
        * Pops the input stream of the connection and fills in the inbound
        * Msg queue with Msgs sent to this node. Otherwise, fills in the
        * outbound queue with Msgs sent to other nodes, which are proxied by this
  @@ -298,7 +330,7 @@
               }
           };
           MsgCopier copier = new MsgCopier(
  -                aConnection.in, out, aConnection.listener);
  +            aConnection.in, out, aConnection.listener);
           node.processors.execute(copier);
       }
   
  @@ -326,6 +358,9 @@
            */
           private final static byte TOPOLOGY = 0x01;
           
  +        private final static byte REQUEST = 0x01;
  +        private final static byte RESPONSE = 0x02;
  +        
           public Object beforePop(StreamInputStream anIn)
               throws IOException {
               byte type = anIn.readByte(); 
  @@ -335,13 +370,21 @@
               if ( null == topology ) {
                   throw new IllegalArgumentException("No topology is defined.");
               }
  -            Object[] result = new Object[2];
  +            Object[] result = new Object[4];
               int id = anIn.readInt();
               NodeInfo nodeInfo = topology.getNodeById(id);
               result[0] = nodeInfo;
               id = anIn.readInt();
               nodeInfo = topology.getNodeById(id);
               result[1] = nodeInfo;
  +            int bodyType = anIn.read();
  +            if ( REQUEST == bodyType ) {
  +                result[2] = MsgBody.Type.REQUEST;
  +            } else {
  +                result[2] = MsgBody.Type.RESPONSE;
  +            }
  +            int reqID = anIn.readInt();
  +            result[3] = new RequestSender.RequestID(reqID);
               return result;
           }
           public void afterPop(StreamInputStream anIn, Msg aMsg, Object anOpaque)
  @@ -354,6 +397,8 @@
               header.addHeader(MsgHeaderConstants.SRC_NODE, prePop[0]);
               header.addHeader(MsgHeaderConstants.DEST_NODE, prePop[1]);
               header.addHeader(MsgHeaderConstants.DEST_NODES, prePop[1]);
  +            header.addHeader(MsgHeaderConstants.BODY_TYPE, prePop[2]);
  +            header.addHeader(MsgHeaderConstants.CORRELATION_ID, prePop[3]);
           }
           
           public Object beforePush(StreamOutputStream anOut, Msg aMsg)
  @@ -375,6 +420,16 @@
               // DEST_NODE.
               header.resetHeader(MsgHeaderConstants.DEST_NODES);
               anOut.writeInt(id);
  +            MsgBody.Type type  = (MsgBody.Type)
  +                header.resetHeader(MsgHeaderConstants.BODY_TYPE);
  +            if ( type == MsgBody.Type.REQUEST ) {
  +                anOut.write(REQUEST);
  +            } else {
  +                anOut.write(RESPONSE);
  +            }
  +            RequestSender.RequestID reqID  = (RequestSender.RequestID)
  +                header.resetHeader(MsgHeaderConstants.CORRELATION_ID);
  +            anOut.writeInt(reqID.getID());
               return null;
           }
           public void afterPush(StreamOutputStream anOut, Msg aMsg,
  @@ -413,6 +468,11 @@
           private final Object endReleaser = new Object();
           
           /**
  +         * NodeInfo of the client.
  +         */
  +        private NodeInfo nodeInfo;
  +        
  +        /**
            * Creates a connection wrapping the provided input and output streams.
            *  
            * @param anIn InputStream of the connection.
  @@ -450,6 +510,7 @@
               if ( null == aNodeInfo ) {
                   throw new IllegalArgumentException("NodeInfo is required.");
               }
  +            nodeInfo  = aNodeInfo;
               Socket socket =
                   new Socket(aNodeInfo.getAddress(), aNodeInfo.getPort());
               
  @@ -479,6 +540,9 @@
                   rawOut.close();
               } catch (IOException e) {
                   log.error("Can not close output", e);
  +            }
  +            synchronized(connections) {
  +                connections.remove(nodeInfo);
               }
               synchronized(endReleaser) {
                   endReleaser.notify();
  
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MsgInterceptorStoppedException.java
  
  Index: MsgInterceptorStoppedException.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.messaging;
  
  /**
   * Exception thrown when a Msg interceptor chain is stopped.
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/18 12:14:05 $
   */
  public class MsgInterceptorStoppedException extends RuntimeException {
  
      public MsgInterceptorStoppedException(Throwable aNested) {
          super(aNested);
      }
      
  }