You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by gd...@apache.org on 2004/03/16 15:48:59 UTC

cvs commit: incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging IndexedMapTest.java

gdamour     2004/03/16 06:48:59

  Modified:    sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging
                        GInputStream.java StreamOutInterceptor.java
                        StreamInInterceptor.java RequestSender.java
                        Topology.java StreamOutputStream.java
                        StreamInputStream.java MsgHeader.java NodeInfo.java
                        MetaConnection.java
  Added:       sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging
                        IndexedMap.java
               sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging
                        IndexedMapTest.java
  Log:
  o NodeInfos are no more sent across the wire when passing Msgs
  between nodes. Identifiers allocated by Topology are used instead.
  o Round of refactoring: StreamInputInterceptor and
  StreamOutputInterceptor extends ObjectInputStream and
  ObjectOutputStream respectively.
  
  Revision  Changes    Path
  1.3       +3 -5      incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/GInputStream.java
  
  Index: GInputStream.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/GInputStream.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- GInputStream.java	11 Mar 2004 15:36:14 -0000	1.2
  +++ GInputStream.java	16 Mar 2004 14:48:58 -0000	1.3
  @@ -72,15 +72,13 @@
        * Serializes the wrapped InputStream.
        */
       public void writeExternal(ObjectOutput anOut) throws IOException {
  -        StreamOutputStream.CustomObjectOutputStream objOut =
  -            (StreamOutputStream.CustomObjectOutputStream) anOut;
  +        StreamOutputStream objOut = (StreamOutputStream) anOut;
           objOut.writeStream(content);
           objOut.flush();
       }
   
       public void readExternal(ObjectInput anIn) throws IOException, ClassNotFoundException {
  -        StreamInputStream.CustomObjectInputStream objIn =
  -            (StreamInputStream.CustomObjectInputStream) anIn;
  +        StreamInputStream objIn = (StreamInputStream) anIn;
           content = objIn.readStream();
       }
   
  
  
  
  1.3       +72 -7     incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamOutInterceptor.java
  
  Index: StreamOutInterceptor.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamOutInterceptor.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- StreamOutInterceptor.java	11 Mar 2004 15:36:14 -0000	1.2
  +++ StreamOutInterceptor.java	16 Mar 2004 14:48:58 -0000	1.3
  @@ -38,7 +38,12 @@
        * Buffer size to be applied on top of the OutputStream when writting.
        */
       private static final int BUFFER_SIZE = 2048; 
  -    
  +
  +    /**
  +     * Null PushSerialization;
  +     */
  +    private static final PushSynchronization NULL_PUSH_SYNC =
  +        new PushSynchronizationAdaptor();
       
       private static final Log log = LogFactory.getLog(StreamOutInterceptor.class);
       
  @@ -57,7 +62,11 @@
        * actual OutputStream.
        */
       private final ByteArrayOutputStream memOut;
  -    
  +
  +    /**
  +     * Push synchronization to be applied by this instance.
  +     */
  +    private final PushSynchronization pushSynchronization;
       
       /**
        * Pushes Msgs to an OutputStream. Msgs are written by a StreamOutputStream
  @@ -67,20 +76,31 @@
        * @param anOut OutputStream to write to.
        * @param aManager Used to encode InputStream.
        */
  -    public StreamOutInterceptor(OutputStream anOut, StreamManager aManager) {
  +    public StreamOutInterceptor(OutputStream anOut, StreamManager aManager,
  +        PushSynchronization aSerialization)
  +        throws IOException {
           if ( null == anOut ) {
               throw new IllegalArgumentException("OutputStream is required.");
           } else if ( null == aManager ) {
               throw new IllegalArgumentException("StreamManager is required.");
           }
           out = new BufferedOutputStream(anOut, BUFFER_SIZE);
  -        memOut = new ByteArrayOutputStream();
  +        memOut = new ByteArrayOutputStream(BUFFER_SIZE);
           streamOutputStream = new StreamOutputStream(memOut, aManager);
  +        if ( null == aSerialization ) {
  +            pushSynchronization = NULL_PUSH_SYNC; 
  +        } else {
  +            pushSynchronization = aSerialization;
  +        }
       }
       
       public void push(Msg aMessage) {
           try {
  +            Object opaque =
  +                pushSynchronization.beforePush(streamOutputStream, aMessage);
               streamOutputStream.writeObject(aMessage);
  +            pushSynchronization.afterPush(streamOutputStream, aMessage, opaque);
  +            streamOutputStream.reset();
               streamOutputStream.flush();
           } catch (IOException e) {
               log.error(e);
  @@ -88,7 +108,7 @@
           }
           synchronized(out) {
               try {
  -                out.write(memOut.toByteArray());
  +                memOut.writeTo(out);
                   out.flush();
               } catch (IOException e) {
                   log.error(e);
  @@ -97,5 +117,50 @@
           }
           memOut.reset();
       }
  -
  +    
  +    /**
  +     * Allows an implementation to be notified when a Msg is about to be
  +     * pushed.
  +     */
  +    public interface PushSynchronization {
  +        /**
  +         * Notifies the implementation that a Msg is being pushed.
  +         * <BR>
  +         * This method is called before the actual push of the Msg.
  +         * 
  +         * @param anOut Used to write information before the Msg itself.
  +         * @param aMsg Msg being pushed. 
  +         * @return Opaque object which is passed by to this instance via
  +         * afterPush. It can be used to pass information between a beforePush
  +         * and a afterPush call.
  +         * @throws IOException Indicates that an I/O error has occured.
  +         */
  +        public Object beforePush(StreamOutputStream anOut, Msg aMsg)
  +            throws IOException;
  +        /**
  +         * Notifies the implementation that a Msg has been pushed.
  +         * 
  +         * @param anOut Used to write information after the Msg itself.
  +         * @param aMsg Msg which has just been pushed.
  +         * @param anOpaque Value returned by beforePush.
  +         * @throws IOException Indicates that an I/O error has occured.
  +         */
  +        public void afterPush(StreamOutputStream anOut, Msg aMsg,
  +            Object anOpaque)
  +            throws IOException;
  +    }
  +    
  +    /**
  +     * PushSynchronization adaptor.
  +     */
  +    public static class PushSynchronizationAdaptor
  +        implements PushSynchronization {
  +        public Object beforePush(StreamOutputStream anOut, Msg aMsg)
  +            throws IOException {
  +            return null;
  +        }
  +        public void afterPush(StreamOutputStream anOut, Msg aMsg,
  +            Object anOpaque) throws IOException {}
  +    }
  +    
   }
  
  
  
  1.3       +77 -3     incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamInInterceptor.java
  
  Index: StreamInInterceptor.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamInInterceptor.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- StreamInInterceptor.java	11 Mar 2004 15:36:14 -0000	1.2
  +++ StreamInInterceptor.java	16 Mar 2004 14:48:58 -0000	1.3
  @@ -39,6 +39,17 @@
       private static final int BUFFER_SIZE = 2048; 
       
       private static final Log log = LogFactory.getLog(StreamInInterceptor.class);
  +
  +    /**
  +     * Null PopSynchronizarion;
  +     */
  +    private static final PopSynchronization NULL_POP_SYNC =
  +        new PopSynchronizationAdaptor();
  +    
  +    /**
  +     * Raw InputStream to pop Msgs from.
  +     */
  +    private final InputStream rawIn;
       
       /**
        * InputStream to pop Msgs from.
  @@ -51,27 +62,45 @@
       private final StreamInputStream streamInputStream;
       
       /**
  +     * Pop synchronization to be applied by this instance.
  +     */
  +    private final PopSynchronization popSynchronization;
  +    
  +    /**
        * Pops Msgs from an InputStream. Msgs are read by a StreamInputStream
        * using the provided StreamManager to resolve InputStream encoded in the
        * raw InputStream.
        * 
        * @param anIn InputStream to read from.
        * @param aManager Used to resolve encoded InputStream.
  +     * @param aSynchronization PopSynchronization applied by this instance.
        */
  -    public StreamInInterceptor(InputStream anIn, StreamManager aManager) {
  +    public StreamInInterceptor(InputStream anIn, StreamManager aManager,
  +        PopSynchronization aSynchronization)
  +        throws IOException {
           if ( null == anIn ) {
               throw new IllegalArgumentException("InputStream is required.");
           } else if ( null == aManager ) {
               throw new IllegalArgumentException("StreamManager is required.");
           }
  +        rawIn = anIn;
           in = new BufferedInputStream(anIn, BUFFER_SIZE);
           streamInputStream = new StreamInputStream(in, aManager);
  +        if ( null == aSynchronization ) {
  +            popSynchronization = NULL_POP_SYNC;
  +        } else {
  +            popSynchronization = aSynchronization;
  +        }
       }
       
       public Msg pop() {
           Msg msg;
           try {
  -            msg = (Msg) streamInputStream.readObject();
  +            synchronized (rawIn) {
  +                Object opaque = popSynchronization.beforePop(streamInputStream);
  +                msg = (Msg) streamInputStream.readObject();
  +                popSynchronization.afterPop(streamInputStream, msg, opaque);
  +            }
           } catch (ClassNotFoundException e) {
               log.error(e);
               throw new RuntimeException(e);
  @@ -82,4 +111,49 @@
           return msg;
       }
   
  +    /**
  +     * Allows an implementation to be notified when a Msg is about to be
  +     * popped.
  +     */
  +    public interface PopSynchronization {
  +        /**
  +         * Notifies the implementation that a Msg is being popped.
  +         * <BR>
  +         * This method is called before the actual pop of the Msg.
  +         * 
  +         * @param anIn Used to read information from the input stream before
  +         * the Msg itself. 
  +         * @return Opaque object which is passed by to this instance via
  +         * afterPop. It can be used to pass information between a beforePop
  +         * and a afterPop call.
  +         * @throws IOException Indicates that an I/O error has occured.
  +         */
  +        public Object beforePop(StreamInputStream anIn)
  +            throws IOException ;
  +        
  +        /**
  +         * Notifies the implementation that a Msg has been popped.
  +         * 
  +         * @param anIn Used to read information from the input stream after
  +         * the Msg itself. 
  +         * @param aMsg Msg which has just been popped.
  +         * @param anOpaque Value returned by beforePop.
  +         * @throws IOException Indicates that an I/O error has occured.
  +         */
  +        public void afterPop(StreamInputStream anIn, Msg aMsg, Object anOpaque)
  +            throws IOException;
  +    }
  +    
  +    /**
  +     * PopSynchronizarion adaptor.
  +     */
  +    public static class PopSynchronizationAdaptor implements PopSynchronization {
  +        public Object beforePop(StreamInputStream anIn)
  +            throws IOException {
  +            return null;
  +        }
  +        public void afterPop(StreamInputStream anIn, Msg aMsg,
  +            Object anOpaque) throws IOException {}
  +    }
  +    
   }
  
  
  
  1.4       +22 -37    incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/RequestSender.java
  
  Index: RequestSender.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/RequestSender.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- RequestSender.java	11 Mar 2004 15:36:14 -0000	1.3
  +++ RequestSender.java	16 Mar 2004 14:48:59 -0000	1.4
  @@ -22,8 +22,6 @@
   import java.io.ObjectInput;
   import java.io.ObjectOutput;
   import java.lang.reflect.InvocationTargetException;
  -import java.util.HashMap;
  -import java.util.Map;
   
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
  @@ -56,7 +54,7 @@
       /**
        * Request id to FuturResult map.
        */
  -    private final Map responses;
  +    private final IndexedMap responses;
   
       /**
        * Node using this sender.
  @@ -77,7 +75,7 @@
               throw new IllegalArgumentException("Node is required.");
           }
           srcNode = aSrcNode;
  -        responses = new HashMap();
  +        responses = new IndexedMap(1024);
       }
   
       /**
  @@ -106,9 +104,8 @@
           Msg msg = new Msg();
           
           MsgHeader header = msg.getHeader();
  -        Object id = createID(aTargetNodes);
  +        RequestID id = createID(aTargetNodes);
           header.addHeader(MsgHeaderConstants.CORRELATION_ID, id);
  -        header.addHeader(MsgHeaderConstants.SRC_NODE, srcNode);
           header.addHeader(MsgHeaderConstants.DEST_NODES, aTargetNodes);
           
           MsgBody body = msg.getBody();
  @@ -130,16 +127,15 @@
        * @param aTargetNodes Nodes to which the request is to be sent.
        * @return Request identifier.
        */
  -    private Object createID(NodeInfo[] aTargetNodes) {
  +    private RequestID createID(NodeInfo[] aTargetNodes) {
           RequestID id;
  -        synchronized (responses) {
  -            id = new RequestID(requestIDSeq++);
  -            FutureResult[] results = new FutureResult[aTargetNodes.length];
  -            for (int i = 0; i < results.length; i++) {
  -                results[i] = new FutureResult();
  -            }
  -            responses.put(id, results);
  +        int mapID = responses.allocateId();
  +        id = new RequestID(mapID);
  +        FutureResult[] results = new FutureResult[aTargetNodes.length];
  +        for (int i = 0; i < results.length; i++) {
  +            results[i] = new FutureResult();
           }
  +        responses.put(mapID, results);
           return id;
       }
       
  @@ -150,11 +146,9 @@
        * @param aWaitTime number of milliseconds to wait for a response.
        * @return Result of the request.
        */
  -    private CommandResult waitResponse(Object anID, long aWaitTime) {
  -        FutureResult[] results;
  -        synchronized(responses) {
  -            results = (FutureResult[]) responses.get(anID);
  -        }
  +    private CommandResult waitResponse(RequestID anID, long aWaitTime) {
  +        FutureResult[] results =
  +            (FutureResult[]) responses.get(anID.id);
           Exception ex;
           try {
               CommandResult returned = null;
  @@ -163,9 +157,7 @@
                   returned = (CommandResult) results[i].get();
                   // CommandResult returned = (CommandResult) result.timedGet(aWaitTime);
               }
  -            synchronized(responses) {
  -                responses.remove(anID);
  -            }
  +            responses.remove(anID.id);
               return returned;
           } catch (TimeoutException e) {
               log.error(e);
  @@ -187,10 +179,12 @@
        * @param aResult Response
        */
       public void setResponse(Object anID, CommandResult aResult) {
  -        FutureResult[] results;
  -        synchronized(responses) {
  -            results = (FutureResult[]) responses.get(anID);
  +        if ( false == anID instanceof RequestID ) {
  +            throw new IllegalArgumentException("ID is of the wrong type.");
           }
  +        RequestID id = (RequestID) anID;
  +        FutureResult[] results;
  +        results = (FutureResult[]) responses.get(id.id);
           for (int i = 0; i < results.length; i++) {
               FutureResult result = results[i];
               if ( null == result.peek() ) {
  @@ -209,23 +203,14 @@
            * Required for Externalization.
            */
           public RequestID() {}
  -        public RequestID(int anId) {
  -            id = anId;
  +        public RequestID(int anID) {
  +            id = anID;
           }
           public void writeExternal(ObjectOutput out) throws IOException {
               out.writeInt(id);
           }
           public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
               id = in.readInt();
  -        }
  -        public boolean equals(Object obj) {
  -            if ( false == obj instanceof RequestID ) {
  -                return false;
  -            }
  -            return id == ((RequestID)obj).id;
  -        }
  -        public int hashCode() {
  -            return id;
           }
       }
       
  
  
  
  1.2       +105 -35   incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/Topology.java
  
  Index: Topology.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/Topology.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- Topology.java	11 Mar 2004 15:36:14 -0000	1.1
  +++ Topology.java	16 Mar 2004 14:48:59 -0000	1.2
  @@ -48,9 +48,36 @@
        * NodeInfo to PathWeight map.
        */
       private Map nodeToPaths;
  +
  +    private Object nodeAndIdLock = new Object();
  +    private Map nodeToID;
  +    private Map idToNode;
  +    
  +    private static int seqID = 0;
       
       public Topology() {
           nodeToPaths = new HashMap();
  +        nodeToID = new HashMap();
  +        idToNode = new HashMap();
  +    }
  +
  +    private void registerNode(NodeInfo aNodeInfo) {
  +        synchronized(nodeAndIdLock) {
  +            if ( null == nodeToID.get(aNodeInfo) ) {
  +                Integer id = new Integer(seqID++);
  +                nodeToID.put(aNodeInfo, id);
  +                idToNode.put(id, aNodeInfo);
  +            }
  +        }
  +    }
  +    
  +    private void unregisterNode(NodeInfo aNodeInfo) {
  +        synchronized(nodeAndIdLock) {
  +            if ( null == nodeToID.remove(aNodeInfo) ) {
  +                throw new IllegalArgumentException(aNodeInfo +
  +                    " is not registered by this topology.");
  +            }
  +        }
       }
       
       /**
  @@ -64,18 +91,20 @@
               throw new IllegalArgumentException("Path is required.");
           }
           synchronized(nodeToPaths) {
  +            registerNode(aPath.nodeOne);
               Collection related = (Collection) nodeToPaths.get(aPath.nodeOne);
               if ( null == related ) {
                   related = new ArrayList();
                   nodeToPaths.put(aPath.nodeOne, related);
               }
  -            related.add(new Node(aPath.nodeTwo, aPath.weigthOneToTwo));
  +            related.add(new NodeWrapper(aPath.nodeTwo, aPath.weigthOneToTwo));
  +            registerNode(aPath.nodeTwo);
               related = (Collection) nodeToPaths.get(aPath.nodeTwo);
               if ( null == related ) {
                   related = new ArrayList();
                   nodeToPaths.put(aPath.nodeTwo, related);
               }
  -            related.add(new Node(aPath.nodeOne, aPath.weigthTwoToOne));
  +            related.add(new NodeWrapper(aPath.nodeOne, aPath.weigthTwoToOne));
           }
       }
       
  @@ -95,6 +124,7 @@
                       " is not registered by this topology.");
               }
           }
  +        unregisterNode(aNode);
       }
       
       /**
  @@ -112,19 +142,27 @@
                   throw new IllegalArgumentException(aPath.nodeOne +
                       " is not registered by this topology.");
               }
  -            if ( !related.remove(new Node(aPath.nodeTwo, null)) ) {
  +            if ( !related.remove(new NodeWrapper(aPath.nodeTwo, null)) ) {
                   throw new IllegalArgumentException(aPath +
                       " is not registered by this topology.");
               }
  +            if ( 0 == related.size() ) {
  +                unregisterNode(aPath.nodeOne);
  +                nodeToPaths.remove(aPath.nodeOne);
  +            }
               related = (Collection) nodeToPaths.get(aPath.nodeTwo);
               if ( null == related ) {
                   throw new IllegalArgumentException(aPath.nodeTwo +
                       " is not registered by this topology.");
               }
  -            if ( !related.remove(new Node(aPath.nodeOne, null)) ) {
  +            if ( !related.remove(new NodeWrapper(aPath.nodeOne, null)) ) {
                   throw new IllegalArgumentException(aPath +
                       " is not registered by this topology.");
               }
  +            if ( 0 == related.size() ) {
  +                unregisterNode(aPath.nodeTwo);
  +                nodeToPaths.remove(aPath.nodeTwo);
  +            }
           }
       }
   
  @@ -154,10 +192,8 @@
           for (Iterator iter = paths.iterator(); iter.hasNext();index++) {
               int weight = 0;
               List nodeList = (List) iter.next();
  -            for (Iterator iterator = nodeList.iterator();
  -                iterator.hasNext();
  -                ) {
  -                Node node = (Node) iterator.next();
  +            for (Iterator iter2 = nodeList.iterator(); iter2.hasNext();) {
  +                NodeWrapper node = (NodeWrapper) iter2.next();
                   weight += node.weigth.getWeight();
               }
               if ( -1 == minWeight || weight < minWeight ) {
  @@ -166,7 +202,13 @@
               }
           }
           List path = (List) paths.get(minPathIndex);
  -        return (NodeInfo[]) path.toArray(new NodeInfo[0]);
  +        NodeInfo[] result = new NodeInfo[path.size()];
  +        int i = 0;
  +        for (Iterator iter = path.iterator(); iter.hasNext();) {
  +            NodeWrapper wrapper = (NodeWrapper) iter.next();
  +            result[i++] = wrapper.node;
  +        }
  +        return result;
       }
   
       private List getPaths(NodeInfo aSource, NodeInfo aTarget, List aPath,
  @@ -178,22 +220,55 @@
           }
           List returned = new ArrayList();
           for (Iterator iter = related.iterator(); iter.hasNext();) {
  -            Node node = (Node) iter.next();
  -            if ( aPath.contains(node) ) {
  +            NodeWrapper wrapper = (NodeWrapper) iter.next();
  +            if ( aPath.contains(wrapper) ) {
                   continue;
               }
  -            aPath.add(node);
  -            if ( node.equals(aTarget) ) {
  +            aPath.add(wrapper);
  +            if ( wrapper.node.equals(aTarget) ) {
                   returned.add(new ArrayList(aPath));
               } else {
  -                Collection paths = getPaths(node, aTarget, aPath, aNodeToPath);
  +                Collection paths =
  +                    getPaths(wrapper.node, aTarget, aPath, aNodeToPath);
                   returned.addAll(paths);
               }
  -            aPath.remove(node);
  +            aPath.remove(wrapper);
           }
           return returned;
       }
   
  +    /**
  +     * Gets the identifier of the provided node.
  +     * <BR>
  +     * When a node added to a topology, this latter assigns it an identifier.
  +     * 
  +     * @param aNodeInfo Node whose identifier is to be returned.
  +     * @return Node identifier.
  +     */
  +    public int getIDOfNode(NodeInfo aNodeInfo) {
  +        Integer id;
  +        synchronized(nodeAndIdLock) {
  +            id = (Integer) nodeToID.get(aNodeInfo);
  +        }
  +        if ( null == id ) {
  +            throw new IllegalArgumentException(aNodeInfo +
  +                " is not registered by this topology.");
  +        }
  +        return id.intValue();
  +    }
  +    
  +    public NodeInfo getNodeById(int anId) {
  +        NodeInfo nodeInfo;
  +        synchronized(nodeAndIdLock) {
  +            nodeInfo = (NodeInfo) idToNode.get(new Integer(anId));
  +        }
  +        if ( null == nodeInfo ) {
  +            throw new IllegalArgumentException("Identifier " + anId +
  +                " is not registered by this topology.");
  +        }
  +        return nodeInfo;
  +    }
  +    
       public void writeExternal(ObjectOutput out) throws IOException {
           out.writeObject(nodeToPaths);
       }
  @@ -227,35 +302,30 @@
           }
       }
       
  -    private static class Node
  -        extends NodeInfo
  -        implements Externalizable {
  +    private static class NodeWrapper implements Externalizable {
           private PathWeight weigth;
  -        /**
  -         * Required for Externalization.
  -         */
  -        public Node() {}
  -        public Node(NodeInfo aNode, PathWeight aWeight) {
  -            super(aNode.getName(), aNode.getAddress(), aNode.getPort());
  -            if ( null == aNode ) {
  -                throw new IllegalArgumentException("Node is required");
  -            }
  +        private NodeInfo node;
  +        public NodeWrapper(NodeInfo aNode, PathWeight aWeight) {
  +            node = aNode;
               weigth = aWeight;
           }
           public boolean equals(Object obj) {
  -            return super.equals(obj);
  +            if ( false == obj instanceof NodeWrapper ) {
  +                return false;
  +            }
  +            NodeWrapper other = (NodeWrapper) obj;
  +            return node.equals(other.node);
           }
           public int hashCode() {
  -            return super.hashCode();
  +            return node.hashCode();
           }
           public void writeExternal(ObjectOutput out) throws IOException {
  -            super.writeExternal(out);
               out.writeObject(weigth);
  +            out.writeObject(node);
           }
  -        public void readExternal(ObjectInput in)
  -            throws IOException, ClassNotFoundException {
  -            super.readExternal(in);
  +        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
               weigth = (PathWeight) in.readObject();
  +            node = (NodeInfo) in.readObject();
           }
       }
       
  
  
  
  1.4       +14 -26    incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamOutputStream.java
  
  Index: StreamOutputStream.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamOutputStream.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- StreamOutputStream.java	11 Mar 2004 15:36:14 -0000	1.3
  +++ StreamOutputStream.java	16 Mar 2004 14:48:59 -0000	1.4
  @@ -17,7 +17,6 @@
   
   package org.apache.geronimo.datastore.impl.remote.messaging;
   
  -import java.io.DataOutputStream;
   import java.io.IOException;
   import java.io.InputStream;
   import java.io.ObjectOutputStream;
  @@ -29,13 +28,15 @@
    * @version $Revision$ $Date$
    */
   public class StreamOutputStream
  -    extends DataOutputStream
  +    extends ObjectOutputStream
   {
   
       private final StreamManager streamManager;
       
  -    public StreamOutputStream(OutputStream anOut, StreamManager aManager) {
  +    public StreamOutputStream(OutputStream anOut, StreamManager aManager)
  +        throws IOException {
           super(anOut);
  +        enableReplaceObject(true);
           if ( null == aManager ) {
               throw new IllegalArgumentException("StreamManager is required.");
           }
  @@ -47,11 +48,6 @@
           writeObject(id);
       }
   
  -    public void writeObject(Object anObject) throws IOException {
  -        CustomObjectOutputStream objOut = new CustomObjectOutputStream();
  -        objOut.writeObject(anObject);
  -    }
  -    
       /**
        * Gets the StreamManager used to resolve InputStream identifiers.
        * 
  @@ -60,25 +56,17 @@
       public StreamManager getStreamManager() {
           return streamManager;
       }
  -    
  -    public class CustomObjectOutputStream extends ObjectOutputStream {
  -
  -        public CustomObjectOutputStream() throws IOException, SecurityException {
  -            super(StreamOutputStream.this);
  -            enableReplaceObject(true);
  -        }
   
  -        public void writeStream(InputStream aStream) throws IOException {
  -            StreamOutputStream.this.writeStream(aStream);
  -        }
  -        
  -        protected Object replaceObject(Object obj) throws IOException {
  -            if ( obj instanceof InputStream ) {
  -                return new GInputStream((InputStream) obj);
  -            }
  -            return obj;
  +    /**
  +     * It is critical to avoid to write 
  +     */
  +    protected void writeStreamHeader() throws IOException {}
  +    
  +    protected Object replaceObject(Object obj) throws IOException {
  +        if ( obj instanceof InputStream ) {
  +            return new GInputStream((InputStream) obj);
           }
  -        
  +        return obj;
       }
       
   }
  
  
  
  1.4       +25 -28    incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamInputStream.java
  
  Index: StreamInputStream.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamInputStream.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- StreamInputStream.java	11 Mar 2004 15:36:14 -0000	1.3
  +++ StreamInputStream.java	16 Mar 2004 14:48:59 -0000	1.4
  @@ -17,11 +17,10 @@
   
   package org.apache.geronimo.datastore.impl.remote.messaging;
   
  -import java.io.DataInputStream;
   import java.io.IOException;
   import java.io.InputStream;
  -import java.io.ObjectInput;
   import java.io.ObjectInputStream;
  +import java.io.StreamCorruptedException;
   
   /**
    * Allows to read an InputStream from the underlying InputStream. More
  @@ -32,8 +31,7 @@
    * @version $Revision$ $Date$
    */
   public class StreamInputStream
  -    extends DataInputStream
  -    implements ObjectInput
  +    extends ObjectInputStream
   {
   
       /**
  @@ -50,8 +48,10 @@
        * @param aManager StreamManager.
        * @throws IOException If an I/O error has occured.
        */
  -    public StreamInputStream(InputStream anIn, StreamManager aManager) {
  +    public StreamInputStream(InputStream anIn, StreamManager aManager)
  +        throws IOException {
           super(anIn);
  +        enableResolveObject(true);
           if ( null == aManager ) {
               throw new IllegalArgumentException("StreamManager is required.");
           }
  @@ -67,7 +67,7 @@
        * @throws IOException May indicates that the StreamManager does not know
        * about the encoded identifer.
        */
  -    public InputStream readInputStream() throws IOException {
  +    public InputStream readStream() throws IOException {
           Object id;
           try {
               id = readObject();
  @@ -80,11 +80,6 @@
           return returned;
       }
   
  -    public Object readObject() throws ClassNotFoundException, IOException {
  -        CustomObjectInputStream objIn = new CustomObjectInputStream();
  -        return objIn.readObject();
  -    }
  -    
       /**
        * Gets the StreamManager used to register InputStreams.
        * 
  @@ -94,23 +89,25 @@
           return streamManager;
       }
   
  -    public class CustomObjectInputStream extends ObjectInputStream {
  -
  -        public CustomObjectInputStream() throws IOException, SecurityException {
  -            super(StreamInputStream.this);
  -            enableResolveObject(true);
  -        }
  -        
  -        public InputStream readStream() throws IOException {
  -            return StreamInputStream.this.readInputStream();
  -        }
  -        
  -        protected Object resolveObject(Object obj) throws IOException {
  -            if ( obj instanceof GInputStream ) {
  -                return ((GInputStream)obj).getRawInputStream();
  -            }
  -            return obj;
  +    /**
  +     * It is critical to read nothing during the creation of ObjectInputStream.
  +     * <BR>
  +     * Indeed, StreamInputStream and StreamOutputStream are used by
  +     * StreamInInterceptor and StreamOutInterceptor respectively. It should be
  +     * possible to instantiate these two objects at the same time, for instance
  +     * to wrap the InputStream and OutputStream of a Socket.
  +     * <BR>
  +     * If ObjectInputStream reads a StreamHeader during its creation, then it
  +     * is not possible to create these two classes at the same time.  
  +     */
  +    protected void readStreamHeader()
  +        throws IOException, StreamCorruptedException {}
  +    
  +    protected Object resolveObject(Object obj) throws IOException {
  +        if ( obj instanceof GInputStream ) {
  +            return ((GInputStream)obj).getRawInputStream();
           }
  +        return obj;
       }
       
   }
  
  
  
  1.3       +36 -25    incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MsgHeader.java
  
  Index: MsgHeader.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MsgHeader.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- MsgHeader.java	11 Mar 2004 15:36:14 -0000	1.2
  +++ MsgHeader.java	16 Mar 2004 14:48:59 -0000	1.3
  @@ -61,6 +61,18 @@
               headers[i] = aHeader.headers[i];
           }
       }
  +
  +    /**
  +     * Resets to null the value of an header.
  +     * 
  +     * @param aKey Header key.
  +     */
  +    public Object resetHeader(Object aKey) {
  +        int idx = getHeaderIndex(aKey);
  +        Object result = headers[idx];
  +        headers[idx] = null;
  +        return result;
  +    }
       
       /**
        * Adds an header.
  @@ -69,16 +81,10 @@
        * @param aValue Header value.
        */
       public void addHeader(Object aKey, Object aValue) {
  -        for (int i = 0; i < headerConstants.length; i++) {
  -            if ( aKey == headerConstants[i] ) {
  -                headers[i] = aValue;
  -                return;
  -            }
  -        }
  -        throw new IllegalArgumentException("Header {" + aKey +
  -            "} is not supported.");
  +        int idx = getHeaderIndex(aKey);
  +        headers[idx] = aValue;
       }
  -
  +    
       /**
        * Gets a required header.
        *  
  @@ -88,18 +94,13 @@
        * does not exist.
        */
       public Object getHeader(Object aKey) {
  -        for (int i = 0; i < headerConstants.length; i++) {
  -            if ( aKey == headerConstants[i] ) {
  -                Object value = headers[i];
  -                if ( null == value ) {
  -                    throw new IllegalArgumentException("Header {" + aKey +
  -                        "} is not set.");
  -                }
  -                return value;
  -            }
  +        int idx = getHeaderIndex(aKey);
  +        Object value = headers[idx];
  +        if ( null == value ) {
  +            throw new IllegalArgumentException("Header {" + aKey +
  +            "} is not set.");
           }
  -        throw new IllegalArgumentException("Header {" + aKey +
  -            "} is not supported.");
  +        return value;
       }
   
       /**
  @@ -109,16 +110,26 @@
        * @return Header value.
        */
       public Object getOptionalHeader(Object aKey) {
  -        Object value;
  +        int idx = getHeaderIndex(aKey);
  +        return headers[idx];
  +    }
  +
  +    /**
  +     * Gets the index of the header having the specified key.
  +     * 
  +     * @param aKey Header key.
  +     * @return header index.
  +     */
  +    private int getHeaderIndex(Object aKey) {
           for (int i = 0; i < headerConstants.length; i++) {
               if ( aKey == headerConstants[i] ) {
  -                return headers[i];
  +                return i;
               }
           }
           throw new IllegalArgumentException("Header {" + aKey +
  -            "} is not supported.");
  +        "} is not supported.");
       }
  -
  +    
       public void writeExternal(ObjectOutput out) throws IOException {
           out.writeObject(headers);
       }
  
  
  
  1.3       +6 -2      incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/NodeInfo.java
  
  Index: NodeInfo.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/NodeInfo.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- NodeInfo.java	11 Mar 2004 15:36:14 -0000	1.2
  +++ NodeInfo.java	16 Mar 2004 14:48:59 -0000	1.3
  @@ -52,11 +52,15 @@
        * Pops the first element of the array and returns the resulting array.
        * 
        * @param aNodeInfo Array whose first element is to be poped.
  -     * @return New array.
  +     * @return New array. If the size of aNodeInfo is one, then null is
  +     * returned.
        */
       public static NodeInfo[] pop(NodeInfo[] aNodeInfo) {
           if ( null == aNodeInfo || 0 == aNodeInfo.length) {
               throw new IllegalArgumentException("NodeInfo array is required.");
  +        }
  +        if ( 1 == aNodeInfo.length ) {
  +            return null;
           }
           NodeInfo[] returned = new NodeInfo[aNodeInfo.length-1];
           for (int i = 1; i < aNodeInfo.length; i++) {
  
  
  
  1.4       +113 -11   incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MetaConnection.java
  
  Index: MetaConnection.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MetaConnection.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- MetaConnection.java	11 Mar 2004 15:36:14 -0000	1.3
  +++ MetaConnection.java	16 Mar 2004 14:48:59 -0000	1.4
  @@ -54,6 +54,12 @@
       private Topology topology;
       
       /**
  +     * Logical compression and decompression applied on Msgs pushed and poped 
  +     * by this meta-connection.
  +     */
  +    private LogicalCompression logicalComp;
  +    
  +    /**
        * Creates a meta-connection for the specified node.
        * 
        * @param aNode Node.
  @@ -64,6 +70,7 @@
           }
           node = aNode;
           connections = new HashMap();
  +        logicalComp = new LogicalCompression();
       }
       
       /**
  @@ -120,7 +127,6 @@
                   "} is not reachable by {" + node.getNodeInfo() + "}");
           }
           NodeInfo tmpNode = path[0];
  -        NodeInfo[] newPath = NodeInfo.pop(path);
           Connection connection;
           synchronized(connections) {
               connection = (Connection) connections.get(tmpNode);
  @@ -129,14 +135,26 @@
               throw new CommunicationException("{" + aNode +
                   "} is not reachable by {" + node.getNodeInfo() + "}");
           }
  -        return
  +        NodeInfo[] newPath = NodeInfo.pop(path);
  +        MsgOutInterceptor out =
               new HeaderOutInterceptor(
  -                MsgHeaderConstants.DEST_NODES,
  -                aNode,
  +                MsgHeaderConstants.SRC_NODE,
  +                node.getNodeInfo(),
  +                new HeaderOutInterceptor(
  +                    MsgHeaderConstants.DEST_NODE,
  +                    aNode,
  +                    new HeaderOutInterceptor(
  +                        MsgHeaderConstants.DEST_NODES,
  +                        aNode,
  +                        connection.out)));
  +        if ( null != newPath ) {
  +            out =
                   new HeaderOutInterceptor(
                       MsgHeaderConstants.DEST_NODE_PATH,
                       newPath,
  -                    connection.out));
  +                    out);
  +        }
  +        return out; 
       }
       
       /**
  @@ -272,7 +290,7 @@
               public void push(Msg aMsg) {
                   MsgHeader header = aMsg.getHeader();
                   if ( node.getNodeInfo().equals(
  -                    header.getHeader(MsgHeaderConstants.DEST_NODES)) ) {
  +                    header.getHeader(MsgHeaderConstants.DEST_NODE)) ) {
                       inboundOut.push(aMsg);
                   } else {
                       outboundOut.push(aMsg);
  @@ -283,6 +301,86 @@
                   aConnection.in, out, aConnection.listener);
           node.processors.execute(copier);
       }
  +
  +    /**
  +     * Logical compression of the Msgs pushed and poped by this meta-connection.
  +     * <BR>
  +     * Its goal is to remove from Msgs to be serialized the information, shared
  +     * by all the nodes. For instance, as a Topology is shared by all the nodes
  +     * one can replace the NodeInfo instances contained by Msgs by their
  +     * corresponding identifier. 
  +     *
  +     * @version $Revision$ $Date$
  +     */
  +    private class LogicalCompression implements
  +        StreamInInterceptor.PopSynchronization,
  +        StreamOutInterceptor.PushSynchronization {
  +        
  +        /**
  +         * No logical compression.
  +         */
  +        private final static byte NULL = 0x00;
  +        
  +        /**
  +         * Compression based on the Topology shared knowledge.
  +         */
  +        private final static byte TOPOLOGY = 0x01;
  +        
  +        public Object beforePop(StreamInputStream anIn)
  +            throws IOException {
  +            byte type = anIn.readByte(); 
  +            if ( type == NULL ) {
  +                return null;
  +            }
  +            if ( null == topology ) {
  +                throw new IllegalArgumentException("No topology is defined.");
  +            }
  +            Object[] result = new Object[2];
  +            int id = anIn.readInt();
  +            NodeInfo nodeInfo = topology.getNodeById(id);
  +            result[0] = nodeInfo;
  +            id = anIn.readInt();
  +            nodeInfo = topology.getNodeById(id);
  +            result[1] = nodeInfo;
  +            return result;
  +        }
  +        public void afterPop(StreamInputStream anIn, Msg aMsg, Object anOpaque)
  +            throws IOException {
  +            if ( null == anOpaque ) {
  +                return;
  +            }
  +            Object[] prePop = (Object[]) anOpaque;
  +            MsgHeader header = aMsg.getHeader();
  +            header.addHeader(MsgHeaderConstants.SRC_NODE, prePop[0]);
  +            header.addHeader(MsgHeaderConstants.DEST_NODE, prePop[1]);
  +            header.addHeader(MsgHeaderConstants.DEST_NODES, prePop[1]);
  +        }
  +        
  +        public Object beforePush(StreamOutputStream anOut, Msg aMsg)
  +            throws IOException {
  +            if ( null == topology ) {
  +                anOut.writeByte(NULL);
  +                return null;
  +            }
  +            anOut.writeByte(TOPOLOGY);
  +            MsgHeader header = aMsg.getHeader();
  +            NodeInfo info =
  +                (NodeInfo) header.resetHeader(MsgHeaderConstants.SRC_NODE);
  +            int id = topology.getIDOfNode(info);
  +            anOut.writeInt(id);
  +            info =
  +                (NodeInfo) header.resetHeader(MsgHeaderConstants.DEST_NODE);
  +            id = topology.getIDOfNode(info);
  +            // When pushing a Msg on the network, DEST_NODES equals to
  +            // DEST_NODE.
  +            header.resetHeader(MsgHeaderConstants.DEST_NODES);
  +            anOut.writeInt(id);
  +            return null;
  +        }
  +        public void afterPush(StreamOutputStream anOut, Msg aMsg,
  +            Object anOpaque) throws IOException {
  +        }
  +    }
       
       /**
        * Logical connection.
  @@ -330,9 +428,11 @@
               }
               
               rawIn = anIn;
  -            in = new StreamInInterceptor(rawIn, node.getStreamManager());
  +            in = new StreamInInterceptor(rawIn, node.getStreamManager(),
  +                logicalComp);
               rawOut = anOut;
  -            out = new StreamOutInterceptor(rawOut, node.getStreamManager());
  +            out = new StreamOutInterceptor(rawOut, node.getStreamManager(),
  +                logicalComp);
               listener = new MsgCopier.NullCopierListener() {
                   public void onFailure() {
                       close();
  @@ -354,9 +454,11 @@
                   new Socket(aNodeInfo.getAddress(), aNodeInfo.getPort());
               
               rawIn = socket.getInputStream();
  -            in = new StreamInInterceptor(rawIn, node.getStreamManager());
  +            in = new StreamInInterceptor(rawIn, node.getStreamManager(),
  +                logicalComp);
               rawOut = socket.getOutputStream();
  -            out = new StreamOutInterceptor(rawOut, node.getStreamManager());
  +            out = new StreamOutInterceptor(rawOut, node.getStreamManager(),
  +                logicalComp);
               listener = new MsgCopier.NullCopierListener() {
                   public void onFailure() {
                       close();
  
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/IndexedMap.java
  
  Index: IndexedMap.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.messaging;
  
  /**
   * @version $Revision: 1.1 $ $Date: 2004/03/16 14:48:59 $
   */
  public class IndexedMap {
  
      private static final Object RESERVED = new Object();
      private final Object[] map;
      private int lastAllocatedIndex;
      
      public IndexedMap(int aSize) {
          if ( aSize < 1 ) {
              throw new IllegalArgumentException("Size is not valid.");
          }
          map = new Object[aSize];
          lastAllocatedIndex = 0;  
      }
      
      public synchronized int allocateId() {
          for (int i=lastAllocatedIndex; i < map.length; i++) {
              if ( null == map[i] ) {
                  lastAllocatedIndex = i;
                  map[i] = RESERVED;
                  return i;
              }
          }
          for (int i=0; i < lastAllocatedIndex; i++) {
              if ( null == map[i] ) {
                  lastAllocatedIndex = i;
                  map[i] = RESERVED;
                  return i;
              }
          }
          throw new RuntimeException("Map is fulled.");
      }
      
      public Object get(int anId) {
          return map[anId];
      }
      
      public void remove(int anId) {
          map[anId] = null;
      }
      
      public void put(int anId, Object anObject) {
          map[anId] = anObject;
      }
      
  }
  
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging/IndexedMapTest.java
  
  Index: IndexedMapTest.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.messaging;
  
  import junit.framework.TestCase;
  
  /**
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/16 14:48:59 $
   */
  public class IndexedMapTest extends TestCase {
  
      private IndexedMap map;
      private int size;
      
      protected void setUp() throws Exception {
          size = 10;
          map = new IndexedMap(size);
      }
      
      public void testAllocate() throws Exception {
          for(int i = 0; i < size; i++) {
              map.allocateId();
          }
          try {
              map.allocateId();
              fail("Map should be full.");
          } catch (RuntimeException e) {
          }
          
      }
      
      public void testGetPut() {
          Object value = new Object();
          int id = map.allocateId();
          map.put(id, value);
          assertEquals(value, map.get(id));
      }
      
  }