You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by pi...@apache.org on 2001/07/20 02:00:54 UTC

cvs commit: jakarta-tomcat-4.0/catalina/src/share/org/apache/catalina/connector/warp WarpConnection.java

pier        01/07/19 17:00:54

  Modified:    catalina/src/share/org/apache/catalina/connector/warp
                        WarpConnection.java
  Log:
  New WARP implementation from Jakarta-Tomcat-Connectors
  
  Revision  Changes    Path
  1.8       +137 -260  jakarta-tomcat-4.0/catalina/src/share/org/apache/catalina/connector/warp/WarpConnection.java
  
  Index: WarpConnection.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-4.0/catalina/src/share/org/apache/catalina/connector/warp/WarpConnection.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- WarpConnection.java	2001/01/24 23:10:42	1.7
  +++ WarpConnection.java	2001/07/20 00:00:53	1.8
  @@ -2,7 +2,7 @@
    *                                                                           *
    *                 The Apache Software License,  Version 1.1                 *
    *                                                                           *
  - *         Copyright (c) 1999, 2000  The Apache Software Foundation.         *
  + *          Copyright (c) 1999-2001 The Apache Software Foundation.          *
    *                           All rights reserved.                            *
    *                                                                           *
    * ========================================================================= *
  @@ -56,326 +56,203 @@
    * ========================================================================= */
   package org.apache.catalina.connector.warp;
   
  -import java.io.*;
  -import java.net.*;
  +import java.io.IOException;
  +import java.io.InputStream;
  +import java.io.OutputStream;
  +import java.net.Socket;
  +
   import org.apache.catalina.Lifecycle;
   import org.apache.catalina.LifecycleEvent;
  -import org.apache.catalina.LifecycleException;
   import org.apache.catalina.LifecycleListener;
  -import org.apache.catalina.util.LifecycleSupport;
  +
  +public class WarpConnection implements LifecycleListener, Runnable {
   
  -/**
  - *
  - * @author <a href="mailto:pier.fumagalli@eng.sun.com">Pier Fumagalli</a>
  - * @author Copyright &copy; 1999, 2000 <a href="http://www.apache.org">The
  - *         Apache Software Foundation.
  - * @version CVS $Id: WarpConnection.java,v 1.7 2001/01/24 23:10:42 pier Exp $
  - */
  -public class WarpConnection implements Lifecycle, Runnable {
  -
  -    // -------------------------------------------------------------- CONSTANTS
  -
  -    /** Our debug flag status (Used to compile out debugging information). */
  -    private static final boolean DEBUG=WarpDebug.DEBUG;
  -
  -    // -------------------------------------------------------- LOCAL VARIABLES
  -
  -    /** The lifecycle event support for this component. */
  -    private LifecycleSupport lifecycle=null;
  -    /** The WarpHandlerTable contains the list of all current handlers. */
  -    private WarpHandlerTable table=null;
  -    /** The name of this connection. */
  -    private String name=null;
  -    /** Wether we started or not. */
  +    /* ==================================================================== */
  +    /* Instance variables                                                   */
  +    /* ==================================================================== */
  +
  +    /* -------------------------------------------------------------------- */
  +    /* Local variables */
  +
  +    /** Our socket input stream. */
  +    private InputStream input=null;
  +    /** Our socket output stream. */
  +    private OutputStream output=null;
  +    /** The started flag. */
       private boolean started=false;
  -    /** The number of active connections. */
  -    private static int num=0;
  +    /** The local thread. */
  +    private Thread thread=null;
  +    /** Our logger. */
  +    private WarpLogger logger=null;
   
  -    // -------------------------------------------------------- BEAN PROPERTIES
  +    /* -------------------------------------------------------------------- */
  +    /* Bean variables */
   
  -    /** The socket used in this connection. */
  +    /** The socket this connection is working on. */
       private Socket socket=null;
  -    /** The connector wich created this connection. */
  +    /** The connector instance. */
       private WarpConnector connector=null;
   
  -    // ------------------------------------------------------------ CONSTRUCTOR
  +    /* ==================================================================== */
  +    /* Constructor                                                          */
  +    /* ==================================================================== */
   
       /**
  -     * Create a new WarpConnection instance.
  +     * Construct a new instance of a <code>WarpConnection</code>.
        */
       public WarpConnection() {
           super();
  -        this.lifecycle=new LifecycleSupport(this);
  -        this.table=new WarpHandlerTable();
  -        if (DEBUG) this.debug("New instance created");
  +        this.logger=new WarpLogger(this);
       }
   
  -    // --------------------------------------------------------- PUBLIC METHODS
  +    /* ==================================================================== */
  +    /* Bean methods                                                         */
  +    /* ==================================================================== */
   
       /**
  -     * Run the thread waiting on the socket, reading packets from the client
  -     * and dispatching them to the appropriate handler.
  +     * Set the socket this connection is working on.
        */
  -    public void run() {
  -        WarpHandler han=null;
  -        InputStream in=null;
  -        int rid=0;
  -        int typ=0;
  -        int len=0;
  -        int ret=0;
  -        int b1=0;
  -        int b2=0;
  -        byte buf[]=null;
  -
  -        // Log the connection opening
  -        num++;
  -        if (DEBUG) this.debug("Connection started (num="+num+") "+this.name);
  -
  -        try {
  -            // Open the socket InputStream
  -            in=this.socket.getInputStream();
  -
  -            // Read packets
  -            while(this.started) {
  -                // RID number
  -                b1=in.read();
  -                b2=in.read();
  -                if ((b1 | b2)==-1) {
  -                    this.log("Premature RID end");
  -                    break;
  -                }
  -                rid=(((b1 & 0x0ff) << 8) | (b2 & 0x0ff));
  -                // Packet type
  -                b1=in.read();
  -                b2=in.read();
  -                if ((b1 | b2)==-1) {
  -                    this.log("Premature TYPE end");
  -                    break;
  -                }
  -                typ=(((b1 & 0x0ff) << 8) | (b2 & 0x0ff));
  -                // Packet payload length
  -                b1=in.read();
  -                b2=in.read();
  -                if ((b1 | b2)==-1) {
  -                    this.log("Premature LEN end");
  -                    break;
  -                }
  -                len=(((b1 & 0x0ff) << 8) | (b2 & 0x0ff));
  -                // Packet payload
  -                buf=new byte[len];
  -                if ((ret=in.read(buf,0,len))!=len) {
  -                    this.log("Premature packet end"+" ("+ret+" of "+len+")");
  -                    break;
  -                }
  -
  -                if (DEBUG) this.debug("Received packet RID="+rid+" TYP="+typ);
  -
  -                // Check if we have the special RID 0x0ffff (disconnect)
  -                if (rid==0x0ffff) {
  -                    this.log("Connection closing ("+new String(buf)+")");
  -                    break;
  -                }
  -
  -                // Dispatch packet
  -                synchronized (this) { han=this.table.get(rid); }
  -                if (han==null) {
  -                    this.log("Handler for RID "+rid+" not found");
  -                    break;
  -                }
  -                han.processData(typ,buf);
  -            }
  -        } catch (IOException e) {
  -            if (this.started) e.printStackTrace(System.err);
  -        }
  -
  -        // Close this connection before terminating the thread
  -        try {
  -            this.stop();
  -        } catch (LifecycleException e) {
  -            this.log(e);
  -        }
  -        num--;
  -        if (DEBUG) this.debug("Connection ended (num="+num+") "+this.name);
  -    }
  -
  -    /**
  -     * Initialize this connection.
  -     *
  -     * @param sock The socket used by this connection to transfer data.
  -     */
  -    public void start()
  -    throws LifecycleException {
  -        // Paranoia checks.
  -        if (this.socket==null)
  -            throw new LifecycleException("Null socket");
  -        if (this.connector==null)
  -            throw new LifecycleException("Null connector");
  -
  -        // Register the WarpConnectionHandler for RID=0 (connection)
  -        this.started=true;
  -        WarpHandler h=new WarpConnectionHandler();
  -        h.setConnection(this);
  -        h.setRequestID(0);
  -        h.start();
  -        // Paranoia check
  -        if(this.registerHandler(h,0)!=true) {
  -            this.stop();
  -            throw new LifecycleException("Cannot register connection handler");
  -        }
  -        // Set the thread and connection name and start the thread
  -        this.name=this.socket.getInetAddress().getHostAddress();
  -        this.name=this.name+":"+this.socket.getPort();
  -        new Thread(this,name).start();
  +    public void setSocket(Socket socket) {
  +        this.socket=socket;
       }
   
       /**
  -     * Send a WARP packet.
  +     * Return the socket this connection is working on.
        */
  -    public void send(int rid, int type, byte buffer[], int offset, int len)
  -    throws IOException {
  -        if (this.socket==null) throw new IOException("Connection closed "+type);
  -        OutputStream out=this.socket.getOutputStream();
  -        byte hdr[]=new byte[6];
  -        // Set the RID number
  -        hdr[0]=(byte)((rid>>8)&0x0ff);
  -        hdr[1]=(byte)(rid&0x0ff);
  -        // Set the TYPE
  -        hdr[2]=(byte)((type>>8)&0x0ff);
  -        hdr[3]=(byte)(type&0x0ff);
  -        // Set the payload length
  -        hdr[4]=(byte)((len>>8)&0x0ff);
  -        hdr[5]=(byte)(len&0x0ff);
  -        // Send the header and payload
  -        synchronized(this) {
  -            out.write(hdr,0,6);
  -            out.write(buffer,offset,len);
  -            out.flush();
  -        }
  -        if (DEBUG) this.debug("Sending packet RID="+rid+" TYP="+type);
  +    public Socket getSocket() {
  +        return(this.socket);
       }
   
       /**
  -     * Close this connection.
  +     * Set the <code>WarpConnector</code> associated with this connection.
        */
  -    public void stop()
  -    throws LifecycleException {
  -        this.started=false;
  -        // Stop all handlers
  -        WarpHandler handlers[]=this.table.handlers();
  -        for (int x=0; x<handlers.length; x++) handlers[x].stop();
  -        // Close the socket (this will make the thread exit)
  -        if (this.socket!=null) try {
  -            this.socket.close();
  -        } catch (IOException e) {
  -            this.log(e);
  -            throw new LifecycleException("Closing connection "+this.name,e);
  -        }
  -
  -        this.socket=null;
  -        // Log this step
  -        this.log("Connection closed");
  -    }
  -
  -    /**
  -     * Add a WarpHandler to this connection.
  -     *
  -     * @param han The WarpHandler add to this connection.
  -     * @param rid The RID number associated with the WarpHandler.
  -     * @return If another WarpHandler is associated with this RID return
  -     *         false, otherwise return true.
  -     */
  -    protected synchronized boolean registerHandler(WarpHandler han, int rid) {
  -        if (DEBUG) this.debug("Registering handler for RID "+rid);
  -        return(this.table.add(han, rid));
  +    public void setConnector(WarpConnector connector) {
  +        this.connector=connector;
  +        this.logger.setContainer(connector.getContainer());
       }
   
       /**
  -     * Remove a WarpHandler from this connection.
  -     *
  -     * @param rid The RID number associated with the WarpHandler to remove.
  -     * @return The old WarpHandler associated with the specified RID or null.
  +     * Return the <code>WarpConnector</code> associated with this connection.
        */
  -    protected synchronized WarpHandler removeHandler(int rid) {
  -        return(this.table.remove(rid));
  +    public WarpConnector getConnector() {
  +        return(this.connector);
       }
   
  -    // ----------------------------------------------------------- BEAN METHODS
  +    /* ==================================================================== */
  +    /* Lifecycle methods                                                    */
  +    /* ==================================================================== */
   
       /**
  -     * Return the socket associated with this connection.
  +     * Get notified of events in the connector.
        */
  -    protected WarpConnector getConnector() {
  -        return(this.connector);
  +    public void lifecycleEvent(LifecycleEvent event) {
  +        if (Lifecycle.STOP_EVENT.equals(event.getType())) this.stop();
       }
   
       /**
  -     * Set the socket used by this connection.
  +     * Start working on this connection.
        */
  -    protected void setConnector(WarpConnector connector) {
  -        if (DEBUG) this.debug("Setting connector");
  -        this.connector=connector;
  +    public void start() {
  +        synchronized(this) {
  +            this.started=true;
  +            this.thread=new Thread(this);
  +            this.thread.start();
  +        }
       }
   
       /**
  -     * Return the socket associated with this connection.
  +     * Stop all we're doing on the connection.
        */
  -    protected Socket getSocket() {
  -        return(this.socket);
  +    public void stop() {
  +        synchronized(this) {
  +            try {
  +                this.started=false;
  +                this.socket.close();
  +                this.getConnector().removeLifecycleListener(this);
  +            } catch (IOException e) {
  +                logger.log("Cannot close socket",e);
  +            }
  +        }
       }
   
       /**
  -     * Set the socket used by this connection.
  +     * Process data from the socket.
        */
  -    protected void setSocket(Socket socket) {
  -        if (DEBUG) this.debug("Setting socket");
  -        this.socket=socket;
  -    }
  +    public void run() {
  +        WarpPacket packet=new WarpPacket();
   
  -    // ------------------------------------------------------ LIFECYCLE METHODS
  +        if (Constants.DEBUG) logger.debug("Connection starting");
   
  -    /**
  -     * Add a lifecycle event listener to this component.
  -     */
  -    public void addLifecycleListener(LifecycleListener listener) {
  -        this.lifecycle.addLifecycleListener(listener);
  -    }
  +        try {
  +            this.input=this.socket.getInputStream();
  +            this.output=this.socket.getOutputStream();
  +            if (!new WarpConfigurationHandler().handle(this,packet)) {
  +                logger.log("Configuration handler returned false");
  +                this.stop();
  +            }
  +            WarpRequestHandler requestHandler=new WarpRequestHandler();
  +            while (requestHandler.handle(this,packet));
  +            this.stop();
  +        } catch (IOException e) {
  +            logger.log("Exception on socket",e);
  +        }
   
  -    /**
  -     * Remove a lifecycle event listener from this component.
  -     */
  -    public void removeLifecycleListener(LifecycleListener listener) {
  -        lifecycle.removeLifecycleListener(listener);
  +        if (Constants.DEBUG) logger.debug("Connection terminated");
       }
  -
  -    // ------------------------------------------ LOGGING AND DEBUGGING METHODS
   
  -    /**
  -     * Dump a log message.
  -     */
  -    public void log(String msg) {
  -        if (this.connector!=null) this.connector.log(msg);
  -        else WarpDebug.debug(this,msg);
  -    }
  +    /* ==================================================================== */
  +    /* Public methods                                                       */
  +    /* ==================================================================== */
   
       /**
  -     * Dump information for an Exception.
  +     * Send a WARP packet over this connection.
        */
  -    public void log(Exception exc) {
  -        if (this.connector!=null) this.connector.log(exc);
  -        else WarpDebug.debug(this,exc);
  -    }
  +    public void send(WarpPacket packet)
  +    throws IOException {
  +        if (Constants.DEBUG) {
  +            logger.debug(">> TYPE="+packet.getType()+" LENGTH="+packet.size);
  +            logger.debug(">> "+packet.dump());
  +        }
   
  -    /**
  -     * Dump a debug message.
  -     */
  -    private void debug(String msg) {
  -        if (DEBUG) WarpDebug.debug(this,msg);
  +        this.output.write(packet.getType()&0x0ff);
  +        this.output.write((packet.size>>8)&0x0ff);
  +        this.output.write((packet.size>>0)&0x0ff);
  +        this.output.write(packet.buffer,0,packet.size);
  +        this.output.flush();
  +        packet.reset();
       }
   
       /**
  -     * Dump information for an Exception.
  +     * Receive a WARP packet over this connection.
        */
  -    private void debug(Exception exc) {
  -        if (DEBUG) WarpDebug.debug(this,exc);
  +    public void recv(WarpPacket packet)
  +    throws IOException {
  +        int t=this.input.read();
  +        int l1=this.input.read();
  +        int l2=this.input.read();
  +
  +        if ((t|l1|l2)==-1)
  +            throw new IOException("Premature packet header end");
  +
  +        packet.reset();
  +        packet.setType(t&0x0ff);
  +        packet.size=(( l1 & 0x0ff ) << 8) | ( l2 & 0x0ff );
  +
  +        if (packet.size>0) {
  +            int off=0;
  +            int ret=0;
  +            while (true) {
  +                ret=this.input.read(packet.buffer,off,packet.size-off);
  +                if (ret==-1) 
  +                    throw new IOException("Premature packet payload end");
  +                off+=ret;
  +                if(off==packet.size) break;
  +            }
  +        }
  +            
  +        if (Constants.DEBUG) {
  +            logger.debug("<< TYPE="+packet.getType()+" LENGTH="+packet.size);
  +            logger.debug("<< "+packet.dump());
  +        }
       }
   }