You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by re...@apache.org on 2005/04/18 18:47:48 UTC

cvs commit: jakarta-tomcat-connectors/http11/src/java/org/apache/coyote/http11 Http11AprProtocol.java Http11AprProcessor.java

remm        2005/04/18 09:47:48

  Modified:    util/java/org/apache/tomcat/util/net AprEndpoint.java
               http11/src/java/org/apache/coyote/http11
                        Http11AprProtocol.java Http11AprProcessor.java
  Log:
  - Add the basis for sendfile (not used yet).
  - Bugfixes to poll handling.
  - The processor will now send the socket to the poller by itself (allows future similar use of sendfile), while the boolean will only
    indicate if the socket should be closed or not.
    is only used to indicate if the socket is to be closed.
  
  Revision  Changes    Path
  1.9       +254 -34   jakarta-tomcat-connectors/util/java/org/apache/tomcat/util/net/AprEndpoint.java
  
  Index: AprEndpoint.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-connectors/util/java/org/apache/tomcat/util/net/AprEndpoint.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- AprEndpoint.java	18 Apr 2005 13:57:12 -0000	1.8
  +++ AprEndpoint.java	18 Apr 2005 16:47:47 -0000	1.9
  @@ -17,6 +17,7 @@
   package org.apache.tomcat.util.net;
   
   import java.net.InetAddress;
  +import java.util.HashMap;
   import java.util.Stack;
   import java.util.Vector;
   
  @@ -24,6 +25,7 @@
   import org.apache.commons.logging.LogFactory;
   import org.apache.tomcat.jni.Address;
   import org.apache.tomcat.jni.Error;
  +import org.apache.tomcat.jni.File;
   import org.apache.tomcat.jni.Library;
   import org.apache.tomcat.jni.Poll;
   import org.apache.tomcat.jni.Pool;
  @@ -73,12 +75,6 @@
   
   
       /**
  -     * The socket poller.
  -     */
  -    protected Poller poller = null;
  -
  -
  -    /**
        * The socket poller thread.
        */
       protected Thread pollerThread = null;
  @@ -87,7 +83,6 @@
       /**
        * The sendfile thread.
        */
  -    // FIXME: Add senfile support
       protected Thread sendfileThread = null;
   
   
  @@ -180,12 +175,20 @@
       /**
        * Size of the socket poller.
        */
  -    protected int pollerSize = 512;
  +    protected int pollerSize = 768;
       public void setPollerSize(int pollerSize) { this.pollerSize = pollerSize; }
       public int getPollerSize() { return pollerSize; }
   
   
       /**
  +     * Size of the sendfile (= concurrent files which can be served).
  +     */
  +    protected int sendfileSize = 256;
  +    public void setSendfileSize(int sendfileSize) { this.sendfileSize = sendfileSize; }
  +    public int getSendfileSize() { return sendfileSize; }
  +
  +
  +    /**
        * Server socket port.
        */
       protected int port;
  @@ -283,10 +286,30 @@
        */
       protected int keepAliveCount = 0;
       public int getKeepAliveCount() { return keepAliveCount; }
  -    public void setKeepAliveCount(int keepAliveCount) { this.keepAliveCount = keepAliveCount; }
  +    
  +    
  +    /**
  +     * Number of sendfile sockets.
  +     */
  +    protected int sendfileCount = 0;
  +    public int getSendfileCount() { return sendfileCount; }
  +    
  +    
  +    /**
  +     * The socket poller.
  +     */
  +    protected Poller poller = null;
  +    public Poller getPoller() { return poller; }
   
   
       /**
  +     * The static file sender.
  +     */
  +    protected Sendfile sendfile = null;
  +    public Sendfile getSendfile() { return sendfile; }
  +    
  +    
  +    /**
        * Dummy maxSpareThreads property.
        */
       public int getMaxSpareThreads() { return 0; }
  @@ -412,14 +435,20 @@
               acceptorThread.start();
   
               // Start poller thread
  -            poller = new Poller(pollerSize);
  +            poller = new Poller();
  +            poller.init();
               pollerThread = new Thread(poller, getName() + "-Poller");
               pollerThread.setPriority(getThreadPriority());
               pollerThread.setDaemon(true);
               pollerThread.start();
   
               // Start sendfile thread
  -            // FIXME: Implement sendfile support
  +            sendfile = new Sendfile();
  +            sendfile.init();
  +            sendfileThread = new Thread(sendfile, getName() + "-Sendfile");
  +            sendfileThread.setPriority(getThreadPriority());
  +            sendfileThread.setDaemon(true);
  +            sendfileThread.start();
           }
       }
   
  @@ -441,6 +470,8 @@
           if (running) {
               running = false;
               unlockAccept();
  +            poller.destroy();
  +            sendfile.destroy();
               acceptorThread = null;
               pollerThread = null;
               sendfileThread = null;
  @@ -519,14 +550,14 @@
       }
   
   
  -    protected boolean processSocket(long s) {
  +    protected boolean processSocket(long socket, long pool) {
           // Process the connection
           int step = 1;
           boolean result = true;
           try {
   
               // 1: Set socket options: timeout, linger, etc
  -            setSocketOptions(s);
  +            setSocketOptions(socket);
   
               // 2: SSL handshake
               step = 2;
  @@ -539,7 +570,7 @@
   
               // 3: Process the connection
               step = 3;
  -            result = getHandler().process(s);
  +            result = getHandler().process(socket, pool);
   
           } catch (Throwable t) {
               if (step == 2) {
  @@ -549,11 +580,8 @@
               } else {
                   log.error(sm.getString("endpoint.err.unexpected"), t);
               }
  -            // Try to close the socket
  -            try {
  -                Socket.close(s);
  -            } catch (Exception e) {
  -            }
  +            // Tell to close the socket
  +            result = false;
           }
           return result;
       }
  @@ -700,30 +728,35 @@
   
       /**
        * Poller class.
  -     *
  -     * FIXME: Windows support using 64 sized pollers
        */
  -    protected class Poller implements Runnable {
  +    public class Poller implements Runnable {
   
           protected long serverPollset = 0;
           protected long pool = 0;
           protected long[] desc;
   
  -        public Poller(int size) {
  +        protected synchronized void init() {
               pool = Pool.create(serverSockPool);
               try {
  -                serverPollset = Poll.create(size, pool, 0, soTimeout * 1000);
  +                serverPollset = Poll.create(pollerSize, pool, 0, soTimeout * 1000);
               } catch (Error e) {
                   // FIXME: more appropriate logging
                   e.printStackTrace();
               }
  -            desc = new long[size * 4];
  +            desc = new long[pollerSize * 4];
           }
   
  -        public synchronized void add(long socket, long pool) {
  +        protected void destroy() {
  +            Pool.destroy(pool);
  +        }
  +        
  +        public void add(long socket, long pool) {
               int rv = Poll.add(serverPollset, socket, pool, Poll.APR_POLLIN);
               if (rv == Status.APR_SUCCESS) {
                   keepAliveCount++;
  +            } else {
  +                // Can't do anything: close the socket right away
  +                Pool.destroy(pool);
               }
           }
   
  @@ -777,9 +810,14 @@
                               // Hand this socket off to a worker
                               getWorkerThread().assign(desc[n*4+1], desc[n*4+2]);
                           }
  -                    }
  -                    else if (rv < 0) {
  -                        // TODO: Poll is probably unusable. So it should bail out.
  +                    } else if (rv < 0) {
  +                        // FIXME: Log with WARN at least
  +                        // Handle poll critical failure
  +                        Pool.clear(serverSockPool);
  +                        synchronized (this) {
  +                            destroy();
  +                            init();
  +                        }
                       }
                   } catch (Throwable t) {
                       // FIXME: Proper logging
  @@ -880,10 +918,7 @@
                       continue;
   
                   // Process the request from this socket
  -                if (processSocket(socket)) {
  -                    // If the socket is still open, add it to the poller
  -                    poller.add(socket, pool);
  -                } else {
  +                if (!processSocket(socket, pool)) {
                       // Close socket and pool
                       Pool.destroy(pool);
                       pool = 0;
  @@ -916,6 +951,191 @@
       }
   
   
  +    // ----------------------------------------------- SendfileData Inner Class
  +
  +
  +    /**
  +     * SendfileData class.
  +     */
  +    public class SendfileData {
  +        // File
  +        public String fileName;
  +        public long fd;
  +        public long fdpool;
  +        // Range information
  +        public long start;
  +        public long end;
  +        // Socket pool
  +        public long pool;
  +        // Position
  +        public long pos;
  +    }
  +    
  +    
  +    // --------------------------------------------------- Sendfile Inner Class
  +
  +
  +    /**
  +     * Sendfile class.
  +     */
  +    public class Sendfile implements Runnable {
  +        
  +        protected long sendfilePollset = 0;
  +        protected long pool = 0;
  +        protected long[] desc;
  +        protected HashMap sendfileData;
  +        protected SendfileData[] state;
  +
  +        protected void init() {
  +            pool = Pool.create(serverSockPool);
  +            try {
  +                sendfilePollset = Poll.create(sendfileSize, pool, 0, soTimeout * 1000);
  +            } catch (Error e) {
  +                // FIXME: more appropriate logging
  +                e.printStackTrace();
  +            }
  +            desc = new long[sendfileSize * 4];
  +            sendfileData = new HashMap(sendfileSize);
  +            state = new SendfileData[sendfileSize];
  +        }
  +        
  +        protected void destroy() {
  +            sendfileData.clear();
  +            Pool.destroy(pool);
  +        }
  +        
  +        public void add(long socket, SendfileData data) {
  +            // Initialize fd from data given
  +            try {
  +                data.fdpool = Pool.create(data.pool);
  +                data.fd = File.open
  +                    (data.fileName, File.APR_FOPEN_READ 
  +                     | File.APR_FOPEN_SENDFILE_ENABLED | File.APR_FOPEN_BINARY,
  +                     0, data.fdpool);
  +                data.pos = data.start;
  +            } catch (Error e) {
  +                // FIXME: more appropriate logging
  +                e.printStackTrace();
  +                return;
  +            }
  +            synchronized (this) {
  +                sendfileData.put(new Long(socket), data);
  +                int rv = Poll.add(sendfilePollset, socket, 0, Poll.APR_POLLOUT);
  +                if (rv == Status.APR_SUCCESS) {
  +                    sendfileCount++;
  +                } else {
  +                    // FIXME: Log with a WARN at least, as the request will 
  +                    // fail from the user perspective
  +                    // Can't do anything: close the socket right away
  +                    Pool.destroy(data.pool);
  +                }
  +            }
  +        }
  +        
  +        public void remove(long socket) {
  +            synchronized (this) {
  +                int rv = Poll.remove(sendfilePollset, socket);
  +                if (rv == Status.APR_SUCCESS) {
  +                    sendfileCount--;
  +                }
  +                sendfileData.remove(new Long(socket));
  +            }
  +        }
  +        
  +        /**
  +         * The background thread that listens for incoming TCP/IP connections and
  +         * hands them off to an appropriate processor.
  +         */
  +        public void run() {
  +
  +            // Loop until we receive a shutdown command
  +            while (running) {
  +
  +                // Loop if endpoint is paused
  +                while (paused) {
  +                    try {
  +                        Thread.sleep(1000);
  +                    } catch (InterruptedException e) {
  +                        // Ignore
  +                    }
  +                }
  +
  +                while (sendfileCount < 1) {
  +                    try {
  +                        Thread.sleep(10);
  +                    } catch (InterruptedException e) {
  +                        // Ignore
  +                    }
  +                }
  +
  +                try {
  +                    // Pool for the specified interval
  +                    int rv = Poll.poll(sendfilePollset, pollTime, desc);
  +                    if (rv > 0) {
  +                        for (int n = 0; n < rv; n++) {
  +                            // Problem events
  +                            if (((desc[n*4] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
  +                                    || ((desc[n*4] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) {
  +                                // Close socket and clear pool
  +                                remove(desc[n*4+1]);
  +                                // Destroy file descriptor pool, which should close the file
  +                                Pool.destroy(state[n].fdpool);
  +                                // Close the socket, as the reponse would be incomplete
  +                                Pool.destroy(state[n].pool);
  +                                continue;
  +                            }
  +                            // Get the sendfile state
  +                            state[n] = 
  +                                (SendfileData) sendfileData.get(new Long(desc[n*4+1]));
  +                            // Write some data using sendfile
  +                            int nw = Socket.sendfilet(desc[n*4+1], state[n].fd, 
  +                                                      null, null, state[n].pos, 
  +                                                      (int) (state[n].end - state[n].pos), 0, 0);
  +                            if (nw < 0) {
  +                                // Close socket and clear pool
  +                                remove(desc[n*4+1]);
  +                                // Destroy file descriptor pool, which should close the file
  +                                Pool.destroy(state[n].fdpool);
  +                                // Close the socket, as the reponse would be incomplete
  +                                Pool.destroy(state[n].pool);
  +                                continue;
  +                            }
  +                            state[n].pos = state[n].pos + nw;
  +                            if (state[n].pos >= state[n].end) {
  +                                remove(desc[n*4+1]);
  +                                // Destroy file descriptor pool, which should close the file
  +                                Pool.destroy(state[n].fdpool);
  +                                // If all done hand this socket off to a worker for 
  +                                // processing of further requests
  +                                getWorkerThread().assign(desc[n*4+1], state[n].pool);
  +                            }
  +                        }
  +                    } else if (rv < 0) {
  +                        // Handle poll critical failure
  +                        // FIXME: Log with WARN at least
  +                        Pool.clear(serverSockPool);
  +                        synchronized (this) {
  +                            destroy();
  +                            init();
  +                        }
  +                    }
  +                } catch (Throwable t) {
  +                    // FIXME: Proper logging
  +                    t.printStackTrace();
  +                }
  +                
  +            }
  +
  +            // Notify the threadStop() method that we have shut ourselves down
  +            synchronized (threadSync) {
  +                threadSync.notifyAll();
  +            }
  +
  +        }
  +
  +    }
  +
  +    
       // -------------------------------------- ConnectionHandler Inner Interface
   
   
  @@ -925,7 +1145,7 @@
        * thread local fields.
        */
       public interface Handler {
  -        public boolean process(long socket);
  +        public boolean process(long socket, long pool);
       }
   
   
  
  
  
  1.3       +2 -2      jakarta-tomcat-connectors/http11/src/java/org/apache/coyote/http11/Http11AprProtocol.java
  
  Index: Http11AprProtocol.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-connectors/http11/src/java/org/apache/coyote/http11/Http11AprProtocol.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- Http11AprProtocol.java	14 Apr 2005 23:32:54 -0000	1.2
  +++ Http11AprProtocol.java	18 Apr 2005 16:47:47 -0000	1.3
  @@ -640,7 +640,7 @@
               return  thData;
           }
   
  -        public boolean process(long socket) {
  +        public boolean process(long socket, long pool) {
               Http11AprProcessor processor=null;
               try {
                   // FIXME: It is also possible to use the TWA data, so keep init() [] for
  @@ -672,7 +672,7 @@
                   processor.setSocket( socket );
                   */
   
  -                return processor.process(socket);
  +                return processor.process(socket, pool);
   
               } catch(java.net.SocketException e) {
                   // SocketExceptions are normal
  
  
  
  1.7       +3 -4      jakarta-tomcat-connectors/http11/src/java/org/apache/coyote/http11/Http11AprProcessor.java
  
  Index: Http11AprProcessor.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-connectors/http11/src/java/org/apache/coyote/http11/Http11AprProcessor.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- Http11AprProcessor.java	16 Apr 2005 17:51:48 -0000	1.6
  +++ Http11AprProcessor.java	18 Apr 2005 16:47:47 -0000	1.7
  @@ -716,12 +716,9 @@
        * Process pipelined HTTP requests using the specified input and output
        * streams.
        *
  -     * @param input stream from which the HTTP requests will be read
  -     * @param output stream which will be used to output the HTTP
  -     * responses
        * @throws IOException error during an I/O operation
        */
  -    public boolean process(long socket)
  +    public boolean process(long socket, long pool)
           throws IOException {
           ThreadWithAttributes thrA=
                   (ThreadWithAttributes)Thread.currentThread();
  @@ -764,6 +761,8 @@
                       // and the method should return true
                       rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
                       openSocket = true;
  +                    // Add the socket to the poller
  +                    endpoint.getPoller().add(socket, pool);
                       break;
                   }
                   request.setStartTime(System.currentTimeMillis());
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org