You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by co...@apache.org on 2005/11/24 06:20:54 UTC

svn commit: r348655 - /tomcat/sandbox/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java

Author: costin
Date: Wed Nov 23 21:20:50 2005
New Revision: 348655

URL: http://svn.apache.org/viewcvs?rev=348655&view=rev
Log:
Refactoring - leave only the core methods, no ThreadPool or specific
code. 

Modified:
    tomcat/sandbox/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java

Modified: tomcat/sandbox/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java
URL: http://svn.apache.org/viewcvs/tomcat/sandbox/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java?rev=348655&r1=348654&r2=348655&view=diff
==============================================================================
--- tomcat/sandbox/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java (original)
+++ tomcat/sandbox/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java Wed Nov 23 21:20:50 2005
@@ -17,21 +17,17 @@
 package org.apache.tomcat.util.net;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.BindException;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
-import java.security.AccessControlException;
-import java.util.Stack;
-import java.util.Vector;
+import java.util.ArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tomcat.util.res.StringManager;
 import org.apache.tomcat.util.threads.ThreadPool;
-import org.apache.tomcat.util.threads.ThreadPoolRunnable;
+import org.apache.tomcat.util.threads.ThreadPool.ThreadPoolListener;
 
 /* Similar with MPM module in Apache2.0. Handles all the details related with
    "tcp server" functionality - thread management, accept policy, etc.
@@ -60,98 +56,123 @@
 
     static Log log=LogFactory.getLog(PoolTcpEndpoint.class );
 
-    private StringManager sm = 
+    protected StringManager sm = 
         StringManager.getManager("org.apache.tomcat.util.net.res");
 
-    private static final int BACKLOG = 100;
-    private static final int TIMEOUT = 1000;
+    protected static final int BACKLOG = 100;
+    protected static final int TIMEOUT = 1000;
 
-    private final Object threadSync = new Object();
+    protected int backlog = BACKLOG;
+    protected int serverTimeout = TIMEOUT;
 
-    private int backlog = BACKLOG;
-    private int serverTimeout = TIMEOUT;
+    protected InetAddress inet;
+    protected int port;
 
-    private InetAddress inet;
-    private int port;
+    protected ServerSocket serverSocket;
 
-    private ServerSocketFactory factory;
-    private ServerSocket serverSocket;
-
-    private volatile boolean running = false;
-    private volatile boolean paused = false;
-    private boolean initialized = false;
-    private boolean reinitializing = false;
-    static final int debug=0;
+    protected volatile boolean running = false;
+    protected volatile boolean paused = false;
+    protected boolean initialized = false;
+    protected boolean reinitializing = false;
 
     protected boolean tcpNoDelay=false;
     protected int linger=100;
     protected int socketTimeout=-1;
-    private boolean lf = true;
-
     
     // ------ Leader follower fields
 
     
     TcpConnectionHandler handler;
-    ThreadPoolRunnable listener;
-    ThreadPool tp;
+    // ------ Master slave fields
 
+    protected int curThreads = 0;
+    protected int maxThreads = 20;
+    protected int maxSpareThreads = 20;
+    protected int minSpareThreads = 20;
+    protected String type;
+
+    protected String name = "EP"; // base name for threads
     
-    // ------ Master slave fields
+    protected int threadPriority;
 
-    /* The background thread. */
-    private Thread thread = null;
-    /* Available processors. */
-    private Stack workerThreads = new Stack();
-    private int curThreads = 0;
-    private int maxThreads = 20;
-    /* All processors which have been created. */
-    private Vector created = new Vector();
+    protected boolean daemon = true;
 
+    private ArrayList listeners = new ArrayList();
     
     public PoolTcpEndpoint() {
-	tp = new ThreadPool();
     }
 
-    public PoolTcpEndpoint( ThreadPool tp ) {
-        this.tp=tp;
+    public static PoolTcpEndpoint getEndpoint(String type) {
+        String cn = null;
+        if( "apr".equals( type )) {
+            cn = "org.apache.tomcat.util.net.AprEndpoint";
+        }
+        if( "lf".equals( type )) {
+            cn = "org.apache.tomcat.util.net.LeaderFollowerEndpoint";
+        }
+        if( "acc".equals( type )) {
+            cn = "org.apache.tomcat.util.net.AcceptorEndpoint";
+        }
+        if( "ms".equals( type )) {
+            cn = "org.apache.tomcat.util.net.MasterSlaveEndpoint";
+        }
+        PoolTcpEndpoint res = null; 
+        if( cn != null ) {
+            try {
+                Class c = Class.forName( cn );
+                res = (PoolTcpEndpoint)c.newInstance();
+            } catch( Throwable t ) {
+                throw new RuntimeException("Can't create endpoint " + cn);
+            }            
+        }
+        if( res == null ) {
+            res = new SimpleEndpoint();
+        }
+        res.type = type;
+        return res;
     }
 
     // -------------------- Configuration --------------------
+    
+    public String getName() {
+        return name;
+    }
 
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    
     public void setMaxThreads(int maxThreads) {
-	if( maxThreads > 0)
-	    tp.setMaxThreads(maxThreads);
+        this.maxThreads = maxThreads;
     }
 
     public int getMaxThreads() {
-        return tp.getMaxThreads();
+        return maxThreads;
     }
 
     public void setMaxSpareThreads(int maxThreads) {
-	if(maxThreads > 0) 
-	    tp.setMaxSpareThreads(maxThreads);
+        this.maxSpareThreads = maxThreads;
     }
 
     public int getMaxSpareThreads() {
-        return tp.getMaxSpareThreads();
+        return maxSpareThreads;
     }
 
     public void setMinSpareThreads(int minThreads) {
-	if(minThreads > 0) 
-	    tp.setMinSpareThreads(minThreads);
+        this.minSpareThreads = minThreads;
     }
 
     public int getMinSpareThreads() {
-        return tp.getMinSpareThreads();
+        return minSpareThreads;
     }
 
     public void setThreadPriority(int threadPriority) {
-      tp.setThreadPriority(threadPriority);
+        this.threadPriority = threadPriority;
     }
 
     public int getThreadPriority() {
-      return tp.getThreadPriority();
+        return threadPriority;
     }
 
     public int getPort() {
@@ -173,15 +194,11 @@
     public void setServerSocket(ServerSocket ss) {
 	    serverSocket = ss;
     }
-
-    public void setServerSocketFactory(  ServerSocketFactory factory ) {
-	    this.factory=factory;
+    
+    public ServerSocket getServerSocket() {
+        return serverSocket;
     }
 
-   ServerSocketFactory getServerSocketFactory() {
- 	    return factory;
-   }
-
     public void setConnectionHandler( TcpConnectionHandler handler ) {
     	this.handler=handler;
     }
@@ -257,19 +274,11 @@
     }
 
     public String getStrategy() {
-        if (lf) {
-            return "lf";
-        } else {
-            return "ms";
-        }
+        return type;
     }
     
     public void setStrategy(String strategy) {
-        if ("ms".equals(strategy)) {
-            lf = false;
-        } else {
-            lf = true;
-        }
+        // shouldn't be used.
     }
 
     public int getCurrentThreadCount() {
@@ -277,83 +286,40 @@
     }
     
     public int getCurrentThreadsBusy() {
-        return curThreads - workerThreads.size();
+        return curThreads;
     }
     
     // -------------------- Public methods --------------------
 
     public void initEndpoint() throws IOException, InstantiationException {
-        try {
-            if(factory==null)
-                factory=ServerSocketFactory.getDefault();
-            if(serverSocket==null) {
-                try {
-                    if (inet == null) {
-                        serverSocket = factory.createSocket(port, backlog);
-                    } else {
-                        serverSocket = factory.createSocket(port, backlog, inet);
-                    }
-                } catch ( BindException be ) {
-                    throw new BindException(be.getMessage() + ":" + port);
-                }
-            }
-            if( serverTimeout >= 0 )
-                serverSocket.setSoTimeout( serverTimeout );
-        } catch( IOException ex ) {
-            throw ex;
-        } catch( InstantiationException ex1 ) {
-            throw ex1;
-        }
-        initialized = true;
     }
     
     public void startEndpoint() throws IOException, InstantiationException {
-        if (!initialized) {
-            initEndpoint();
-        }
-        if (lf) {
-            tp.start();
-        }
-        running = true;
-        paused = false;
-        if (lf) {
-            listener = new LeaderFollowerWorkerThread(this);
-            tp.runIt(listener);
-        } else {
-            maxThreads = getMaxThreads();
-            threadStart();
-        }
     }
 
     public void pauseEndpoint() {
-        if (running && !paused) {
-            paused = true;
-            unlockAccept();
-        }
     }
 
     public void resumeEndpoint() {
-        if (running) {
-            paused = false;
-        }
     }
 
     public void stopEndpoint() {
-        if (running) {
-            if (lf) {
-                tp.shutdown();
-            }
-            running = false;
-            if (serverSocket != null) {
-                closeServerSocket();
-            }
-            if (!lf) {
-                threadStop();
-            }
-            initialized=false ;
-        }
     }
 
+    protected void processSocket(Socket s, TcpConnection con, 
+            Object[] threadData) {
+    }
+
+
+    /** To notify worker done, recycle
+     */
+    public void workerDone(Runnable r) {
+        
+    }
+
+
+    // ---------------- Utils ----------------------
+    
     protected void closeServerSocket() {
         if (!paused)
             unlockAccept();
@@ -393,109 +359,7 @@
         }
     }
 
-    // -------------------- Private methods
-
-    Socket acceptSocket() {
-        if( !running || serverSocket==null ) return null;
-
-        Socket accepted = null;
-
-    	try {
-            if(factory==null) {
-                accepted = serverSocket.accept();
-            } else {
-                accepted = factory.acceptSocket(serverSocket);
-            }
-            if (null == accepted) {
-                log.warn(sm.getString("endpoint.warn.nullSocket"));
-            } else {
-                if (!running) {
-                    accepted.close();  // rude, but unlikely!
-                    accepted = null;
-                } else if (factory != null) {
-                    factory.initSocket( accepted );
-                }
-            }
-        }
-        catch(InterruptedIOException iioe) {
-            // normal part -- should happen regularly so
-            // that the endpoint can release if the server
-            // is shutdown.
-        }
-        catch (AccessControlException ace) {
-            // When using the Java SecurityManager this exception
-            // can be thrown if you are restricting access to the
-            // socket with SocketPermission's.
-            // Log the unauthorized access and continue
-            String msg = sm.getString("endpoint.warn.security",
-                                      serverSocket, ace);
-            log.warn(msg);
-        }
-        catch (IOException e) {
-
-            String msg = null;
-
-            if (running) {
-                msg = sm.getString("endpoint.err.nonfatal",
-                        serverSocket, e);
-                log.error(msg, e);
-            }
-
-            if (accepted != null) {
-                try {
-                    accepted.close();
-                } catch(Throwable ex) {
-                    msg = sm.getString("endpoint.err.nonfatal",
-                                       accepted, ex);
-                    log.warn(msg, ex);
-                }
-                accepted = null;
-            }
-
-            if( ! running ) return null;
-            reinitializing = true;
-            // Restart endpoint when getting an IOException during accept
-            synchronized (threadSync) {
-                if (reinitializing) {
-                    reinitializing = false;
-                    // 1) Attempt to close server socket
-                    closeServerSocket();
-                    initialized = false;
-                    // 2) Reinit endpoint (recreate server socket)
-                    try {
-                        msg = sm.getString("endpoint.warn.reinit");
-                        log.warn(msg);
-                        initEndpoint();
-                    } catch (Throwable t) {
-                        msg = sm.getString("endpoint.err.nonfatal",
-                                           serverSocket, t);
-                        log.error(msg, t);
-                    }
-                    // 3) If failed, attempt to restart endpoint
-                    if (!initialized) {
-                        msg = sm.getString("endpoint.warn.restart");
-                        log.warn(msg);
-                        try {
-                            stopEndpoint();
-                            initEndpoint();
-                            startEndpoint();
-                        } catch (Throwable t) {
-                            msg = sm.getString("endpoint.err.fatal",
-                                               serverSocket, t);
-                            log.error(msg, t);
-                        }
-                        // Current thread is now invalid: kill it
-                        throw new ThreadDeath();
-                    }
-                }
-            }
-
-        }
-
-        return accepted;
-    }
-
-    void setSocketOptions(Socket socket)
+    protected void setSocketOptions(Socket socket)
         throws SocketException {
         if(linger >= 0 ) 
             socket.setSoLinger( true, linger);
@@ -505,179 +369,46 @@
             socket.setSoTimeout( socketTimeout );
     }
 
-    
-    void processSocket(Socket s, TcpConnection con, Object[] threadData) {
-        // Process the connection
-        int step = 1;
-        try {
-            
-            // 1: Set socket options: timeout, linger, etc
-            setSocketOptions(s);
-            
-            // 2: SSL handshake
-            step = 2;
-            if (getServerSocketFactory() != null) {
-                getServerSocketFactory().handshake(s);
-            }
-            
-            // 3: Process the connection
-            step = 3;
-            con.setEndpoint(this);
-            con.setSocket(s);
-            getConnectionHandler().processConnection(con, threadData);
-            
-        } catch (SocketException se) {
-            log.error(sm.getString("endpoint.err.socket", s.getInetAddress()),
-                    se);
-            // Try to close the socket
-            try {
-                s.close();
-            } catch (IOException e) {
-            }
-        } catch (Throwable t) {
-            if (step == 2) {
-                if (log.isDebugEnabled()) {
-                    log.debug(sm.getString("endpoint.err.handshake"), t);
-                }
-            } else {
-                log.error(sm.getString("endpoint.err.unexpected"), t);
-            }
-            // Try to close the socket
-            try {
-                s.close();
-            } catch (IOException e) {
-            }
-        } finally {
-            if (con != null) {
-                con.recycle();
-            }
-        }
-    }
-    
-
-    // -------------------------------------------------- Master Slave Methods
-
-
     /**
-     * Create (or allocate) and return an available processor for use in
-     * processing a specific HTTP request, if possible.  If the maximum
-     * allowed processors have already been created and are in use, return
-     * <code>null</code> instead.
+     * The background thread that listens for incoming TCP/IP connections and
+     * hands them off to an appropriate processor.
      */
-    private MasterSlaveWorkerThread createWorkerThread() {
-
-        synchronized (workerThreads) {
-            if (workerThreads.size() > 0) {
-                return ((MasterSlaveWorkerThread) workerThreads.pop());
-            }
-            if ((maxThreads > 0) && (curThreads < maxThreads)) {
-                return (newWorkerThread());
-            } else {
-                if (maxThreads < 0) {
-                    return (newWorkerThread());
-                } else {
-                    return (null);
-                }
-            }
-        }
-
+    public void run() {
     }
 
-    
-    /**
-     * Create and return a new processor suitable for processing HTTP
-     * requests and returning the corresponding responses.
-     */
-    private MasterSlaveWorkerThread newWorkerThread() {
-
-        MasterSlaveWorkerThread workerThread = 
-            new MasterSlaveWorkerThread(this, tp.getName() + "-" + (++curThreads));
-        workerThread.start();
-        created.addElement(workerThread);
-        return (workerThread);
-
+    public void setSSLSupport(boolean secure, String socketFactoryName) throws Exception {
     }
 
-
-    /**
-     * Recycle the specified Processor so that it can be used again.
-     *
-     * @param processor The processor to be recycled
-     */
-    void recycleWorkerThread(MasterSlaveWorkerThread workerThread) {
-        workerThreads.push(workerThread);
+    public void setDaemon(boolean b) {
+        daemon=b;
     }
-
     
-    /**
-     * The background thread that listens for incoming TCP/IP connections and
-     * hands them off to an appropriate processor.
-     */
-    public void run() {
-
-        // Loop until we receive a shutdown command
-        while (running) {
-
-            // Loop if endpoint is paused
-            while (paused) {
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                    // Ignore
-                }
-            }
-
-            // Allocate a new worker thread
-            MasterSlaveWorkerThread workerThread = createWorkerThread();
-            if (workerThread == null) {
-                try {
-                    // Wait a little for load to go down: as a result, 
-                    // no accept will be made until the concurrency is
-                    // lower than the specified maxThreads, and current
-                    // connections will wait for a little bit instead of
-                    // failing right away.
-                    Thread.sleep(100);
-                } catch (InterruptedException e) {
-                    // Ignore
-                }
-                continue;
-            }
-            
-            // Accept the next incoming connection from the server socket
-            Socket socket = acceptSocket();
-
-            // Hand this socket off to an appropriate processor
-            workerThread.assign(socket);
-
-            // The processor will recycle itself when it finishes
-
-        }
-
-        // Notify the threadStop() method that we have shut ourselves down
-        synchronized (threadSync) {
-            threadSync.notifyAll();
-        }
-
+    public boolean getDaemon() {
+        return daemon;
     }
 
-
-    /**
-     * Start the background processing thread.
-     */
-    private void threadStart() {
-        thread = new Thread(this, tp.getName());
-        thread.setPriority(getThreadPriority());
-        thread.setDaemon(true);
-        thread.start();
+    protected void threadStart(Thread t) {
+        for( int i=0; i<listeners.size(); i++ ) {
+            EndpointListener tpl=(EndpointListener)listeners.get(i);
+            tpl.threadStart(this, t);
+        }        
     }
 
-
-    /**
-     * Stop the background processing thread.
-     */
-    private void threadStop() {
-        thread = null;
+    protected void threadEnd(Thread t) {
+        for( int i=0; i<listeners.size(); i++ ) {
+            EndpointListener tpl=(EndpointListener)listeners.get(i);
+            tpl.threadStart(this, t);
+        }        
+    }
+    
+    public void addEndpointListener(EndpointListener listener) {
+        listeners.add(listener);
     }
+ 
+    public static interface EndpointListener {
+        public void threadStart( PoolTcpEndpoint ep, Thread t);
 
+        public void threadEnd( PoolTcpEndpoint ep, Thread t);
 
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org