You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2008/12/08 04:32:08 UTC

svn commit: r724239 - /tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java

Author: fhanik
Date: Sun Dec  7 19:32:07 2008
New Revision: 724239

URL: http://svn.apache.org/viewvc?rev=724239&view=rev
Log:
Remove the synchronize/notifyAll based thread pool, and use only the built in pool

Modified:
    tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=724239&r1=724238&r2=724239&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Sun Dec  7 19:32:07 2008
@@ -116,12 +116,6 @@
 
 
     /**
-     * Available workers.
-     */
-    protected WorkerStack workers = null;
-
-
-    /**
      * Running state of the endpoint.
      */
     protected volatile boolean running = false;
@@ -349,7 +343,11 @@
     public Executor getExecutor() { return executor; }
     
     protected boolean useExecutor = true;
-    public void setUseExecutor(boolean useexec) { useExecutor = useexec;}
+    /**
+     * @deprecated Executor is always used
+     * @param useexec
+     */
+    public void setUseExecutor(boolean useexec) { log.info("Setting useExecutor is deprecated. Executors are always used.");}
     public boolean getUseExecutor() { return useExecutor || (executor!=null);}
 
     /**
@@ -359,14 +357,10 @@
     public void setMaxThreads(int maxThreads) {
         this.maxThreads = maxThreads;
         if (running) {
-            if (getUseExecutor() && executor!=null) {
+            if (executor!=null) {
                 if (executor instanceof ThreadPoolExecutor) {
                     ((ThreadPoolExecutor)executor).setMaximumPoolSize(maxThreads);
                 }
-            }else if (workers!=null){            
-                synchronized(workers) {
-                    workers.resize(maxThreads);
-                }
             }
         }
     }
@@ -857,15 +851,11 @@
             paused = false;
             
             // Create worker collection
-            if (getUseExecutor()) {
-                if ( executor == null ) {
-                    TaskQueue taskqueue = new TaskQueue();
-                    TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-");
-                    executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
-                    taskqueue.setParent( (ThreadPoolExecutor) executor, this);
-                }
-            } else if ( executor == null ) {//avoid two thread pools being created
-                workers = new WorkerStack(maxThreads);
+            if ( executor == null ) {
+                TaskQueue taskqueue = new TaskQueue();
+                TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-");
+                executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
+                taskqueue.setParent( (ThreadPoolExecutor) executor, this);
             }
 
             // Start poller threads
@@ -1104,99 +1094,8 @@
     protected boolean isWorkerAvailable() {
         if ( executor != null ) {
             return true;
-        } else {
-            if (workers.size() > 0) {
-                return true;
-            }
-            if ( (maxThreads > 0) && (curThreads < maxThreads)) {
-                return true;
-            } else {
-                if (maxThreads < 0) {
-                    return true;
-                } else {
-                    return false;
-                }
-            }
-        }
-    }
-    /**
-     * Create (or allocate) and return an available processor for use in
-     * processing a specific HTTP request, if possible.  If the maximum
-     * allowed processors have already been created and are in use, return
-     * <code>null</code> instead.
-     */
-    protected Worker createWorkerThread() {
-
-        synchronized (workers) {
-            if (workers.size() > 0) {
-                curThreadsBusy++;
-                return (workers.pop());
-            }
-            if ((maxThreads > 0) && (curThreads < maxThreads)) {
-                curThreadsBusy++;
-                if (curThreadsBusy == maxThreads) {
-                    log.info(sm.getString("endpoint.info.maxThreads",
-                            Integer.toString(maxThreads), address,
-                            Integer.toString(port)));
-                }
-                return (newWorkerThread());
-            } else {
-                if (maxThreads < 0) {
-                    curThreadsBusy++;
-                    return (newWorkerThread());
-                } else {
-                    return (null);
-                }
-            }
-        }
-    }
-
-
-    /**
-     * Create and return a new processor suitable for processing HTTP
-     * requests and returning the corresponding responses.
-     */
-    protected Worker newWorkerThread() {
-
-        Worker workerThread = new Worker();
-        workerThread.start();
-        return (workerThread);
-
-    }
-
-
-    /**
-     * Return a new worker thread, and block while to worker is available.
-     */
-    protected Worker getWorkerThread() {
-        // Allocate a new worker thread
-        Worker workerThread = createWorkerThread();
-        while (workerThread == null) {
-            try {
-                synchronized (workers) {
-                    workerThread = createWorkerThread();
-                    if ( workerThread == null ) workers.wait();
-                }
-            } catch (InterruptedException e) {
-                // Ignore
-            }
-            if ( workerThread == null ) workerThread = createWorkerThread();
-        }
-        return workerThread;
-    }
-
-
-    /**
-     * Recycle the specified Processor so that it can be used again.
-     *
-     * @param workerThread The processor to be recycled
-     */
-    protected void recycleWorkerThread(Worker workerThread) {
-        synchronized (workers) {
-            workers.push(workerThread);
-            curThreadsBusy--;
-            workers.notify();
         }
+        return false;
     }
     /**
      * Process given socket.
@@ -1217,15 +1116,11 @@
         try {
             KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false);
             attachment.setCometNotify(false); //will get reset upon next reg
-            if (executor == null) {
-                getWorkerThread().assign(socket, status);
-            } else {
-                SocketProcessor sc = processorCache.poll();
-                if ( sc == null ) sc = new SocketProcessor(socket,status);
-                else sc.reset(socket,status);
-                if ( dispatch ) executor.execute(sc);
-                else sc.run();
-            }
+            SocketProcessor sc = processorCache.poll();
+            if ( sc == null ) sc = new SocketProcessor(socket,status);
+            else sc.reset(socket,status);
+            if ( dispatch && executor!=null ) executor.execute(sc);
+            else sc.run();
         } catch (Throwable t) {
             // This means we got an OOM or similar creating a thread, or that
             // the pool and its queue are full
@@ -1888,162 +1783,6 @@
         protected long lastRegistered = 0;
         protected SendfileData sendfileData = null;
     }
-    // ----------------------------------------------------- Worker Inner Class
-
-
-    /**
-     * Server processor class.
-     */
-    protected class Worker implements Runnable {
-
-
-        protected Thread thread = null;
-        protected boolean available = false;
-        protected Object socket = null;
-        protected SocketStatus status = null;
-
-
-        /**
-         * Process an incoming TCP/IP connection on the specified socket.  Any
-         * exception that occurs during processing must be logged and swallowed.
-         * <b>NOTE</b>:  This method is called from our Connector's thread.  We
-         * must assign it to our own thread so that multiple simultaneous
-         * requests can be handled.
-         *
-         * @param socket TCP socket to process
-         */
-        protected synchronized void assign(Object socket) {
-
-            // Wait for the Processor to get the previous Socket
-            while (available) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                }
-            }
-            // Store the newly available Socket and notify our thread
-            this.socket = socket;
-            status = null;
-            available = true;
-            notifyAll();
-
-        }
-
-
-        protected synchronized void assign(Object socket, SocketStatus status) {
-
-            // Wait for the Processor to get the previous Socket
-            while (available) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                }
-            }
-
-            // Store the newly available Socket and notify our thread
-            this.socket = socket;
-            this.status = status;
-            available = true;
-            notifyAll();
-        }
-
-
-        /**
-         * Await a newly assigned Socket from our Connector, or <code>null</code>
-         * if we are supposed to shut down.
-         */
-        protected synchronized Object await() {
-
-            // Wait for the Connector to provide a new Socket
-            while (!available) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                }
-            }
-
-            // Notify the Connector that we have received this Socket
-            Object socket = this.socket;
-            available = false;
-            notifyAll();
-
-            return (socket);
-
-        }
-
-
-        /**
-         * The background thread that listens for incoming TCP/IP connections and
-         * hands them off to an appropriate processor.
-         */
-        public void run() {
-
-            // Process requests until we receive a shutdown signal
-            while (running) {
-                NioChannel socket = null;
-                SelectionKey key = null;
-                try {
-                    // Wait for the next socket to be assigned
-                    Object channel = await();
-                    if (channel == null)
-                        continue;
-
-                    if ( channel instanceof SocketChannel) {
-                        SocketChannel sc = (SocketChannel)channel;
-                        if ( !setSocketOptions(sc) ) {
-                            try {
-                                sc.socket().close();
-                                sc.close();
-                            }catch ( IOException ix ) {
-                                if ( log.isDebugEnabled() ) log.debug("",ix);
-                            }
-                        } else {
-                            //now we have it registered, remove it from the cache
-                            
-                        }
-                    } else {
-                        socket = (NioChannel)channel;
-                        SocketProcessor sc = processorCache.poll();
-                        if ( sc == null ) sc = new SocketProcessor(socket,status);
-                        else sc.reset(socket,status);
-                        sc.run();
-                    }
-                }catch(CancelledKeyException cx) {
-                    if (socket!=null && key!=null) socket.getPoller().cancelledKey(key,null,false);
-                } catch (OutOfMemoryError oom) {
-                    try {
-                        oomParachuteData = null;
-                        releaseCaches();
-                        log.error("", oom);
-                    }catch ( Throwable oomt ) {
-                        try {
-                            System.err.println(oomParachuteMsg);
-                            oomt.printStackTrace();
-                        }catch (Throwable letsHopeWeDontGetHere){}
-                    }
-                } finally {
-                    //dereference socket to let GC do its job
-                    socket = null;
-                    // Finish up this request
-                    recycleWorkerThread(this);
-                }
-            }
-        }
-
-
-        /**
-         * Start the background processing thread.
-         */
-        public void start() {
-            thread = new Thread(this);
-            thread.setName(getName() + "-" + (++curThreads));
-            thread.setDaemon(true);
-            thread.setPriority(getThreadPriority());
-            thread.start();
-        }
-
-
-    }
 
     // ------------------------------------------------ Application Buffer Handler
     public class NioBufferHandler implements ApplicationBufferHandler {
@@ -2085,84 +1824,6 @@
     }
 
 
-    // ------------------------------------------------- WorkerStack Inner Class
-
-
-    public class WorkerStack {
-
-        protected Worker[] workers = null;
-        protected int end = 0;
-
-        public WorkerStack(int size) {
-            workers = new Worker[size];
-        }
-
-        /** 
-         * Put the object into the queue. If the queue is full (for example if
-         * the queue has been reduced in size) the object will be dropped.
-         * 
-         * @param   object  the object to be appended to the queue (first
-         *                  element).
-         */
-        public void push(Worker worker) {
-            if (end < workers.length) {
-                workers[end++] = worker;
-            } else {
-                curThreads--;
-            }
-        }
-
-        /**
-         * Get the first object out of the queue. Return null if the queue
-         * is empty. 
-         */
-        public Worker pop() {
-            if (end > 0) {
-                return workers[--end];
-            }
-            return null;
-        }
-
-        /**
-         * Get the first object out of the queue, Return null if the queue
-         * is empty.
-         */
-        public Worker peek() {
-            return workers[end];
-        }
-
-        /**
-         * Is the queue empty?
-         */
-        public boolean isEmpty() {
-            return (end == 0);
-        }
-
-        /**
-         * How many elements are there in this queue?
-         */
-        public int size() {
-            return (end);
-        }
-        
-        /**
-         * Resize the queue. If there are too many objects in the queue for the
-         * new size, drop the excess.
-         * 
-         * @param newSize
-         */
-        public void resize(int newSize) {
-            Worker[] newWorkers = new Worker[newSize];
-            int len = workers.length;
-            if (newSize < len) {
-                len = newSize;
-            }
-            System.arraycopy(workers, 0, newWorkers, 0, len);
-            workers = newWorkers;
-        }
-    }
-
-
     // ---------------------------------------------- SocketProcessor Inner Class
 
 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org