You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2007/03/14 00:54:04 UTC

svn commit: r517941 - in /tomcat/tc6.0.x/trunk: java/org/apache/coyote/http11/Http11NioProtocol.java java/org/apache/tomcat/util/net/NioEndpoint.java webapps/docs/config/http.xml

Author: fhanik
Date: Tue Mar 13 16:54:03 2007
New Revision: 517941

URL: http://svn.apache.org/viewvc?view=rev&rev=517941
Log:
Implement the use of a useful executor, this executor will increase threads until it reaches max threads, then it starts queueing the connections. This yields in much better fairness.

Modified:
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml

Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java?view=diff&rev=517941&r1=517940&r2=517941
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java Tue Mar 13 16:54:03 2007
@@ -256,6 +256,10 @@
     public void setExecutor(Executor executor) {
         ep.setExecutor(executor);
     }
+    
+    public void setUseExecutor(boolean useexec) {
+        ep.setUseExecutor(useexec);
+    }
 
     public int getMaxThreads() {
         return ep.getMaxThreads();

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?view=diff&rev=517941&r1=517940&r2=517941
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Tue Mar 13 16:54:03 2007
@@ -51,6 +51,8 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Collection;
+import java.util.concurrent.ThreadFactory;
 
 /**
  * NIO tailored thread pool, providing the following services:
@@ -268,7 +270,10 @@
     protected Executor executor = null;
     public void setExecutor(Executor executor) { this.executor = executor; }
     public Executor getExecutor() { return executor; }
-
+    
+    protected boolean useExecutor = true;
+    public void setUseExecutor(boolean useexec) { useExecutor = useexec;}
+    public boolean getUseExecutor() { return useExecutor;}
 
     /**
      * Maximum amount of worker threads.
@@ -639,7 +644,7 @@
 
 
     /**
-     * Start the APR endpoint, creating acceptor, poller threads.
+     * Start the NIO endpoint, creating acceptor, poller threads.
      */
     public void start()
         throws Exception {
@@ -652,9 +657,15 @@
             paused = false;
             
             // Create worker collection
-            if (executor == null) {
-                //workers = new WorkerStack(maxThreads);
-                executor = new ThreadPoolExecutor(getMaxThreads(),getMaxThreads(),5000,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
+            if (getUseExecutor()) {
+                if ( executor == null ) {
+                    TaskQueue taskqueue = new TaskQueue();
+                    TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-");
+                    executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
+                    taskqueue.setParent( (ThreadPoolExecutor) executor);
+                }
+            } else {
+                workers = new WorkerStack(maxThreads);
             }
 
             // Start acceptor threads
@@ -716,6 +727,13 @@
         eventCache.clear();
         keyCache.clear();
         nioChannels.clear();
+        if ( executor!=null ) {
+            ThreadPoolExecutor tpe = (ThreadPoolExecutor)executor;
+            tpe.shutdown();
+            TaskQueue queue = (TaskQueue)tpe.getQueue();
+            queue.setParent(null);
+            executor = null;
+        }
     }
 
 
@@ -863,6 +881,8 @@
      */
     protected boolean isWorkerAvailable() {
         if ( executor != null ) {
+//            ThreadPoolExecutor tpe = (ThreadPoolExecutor)executor;
+//            TaskQueue queue = (TaskQueue)tpe.getQueue();
             return true;
         } else {
             if (workers.size() > 0) {
@@ -953,23 +973,6 @@
             workers.notify();
         }
     }
-
-
-    protected boolean processSocket(SocketChannel socket) {
-        try {
-            if (executor == null) {
-                getWorkerThread().assign(socket);
-            }  else {
-                executor.execute(new SocketOptionsProcessor(socket));
-            }
-        } catch (Throwable t) {
-            // This means we got an OOM or similar creating a thread, or that
-            // the pool and its queue are full
-            log.error(sm.getString("endpoint.process.fail"), t);
-            return false;
-        }
-        return true;
-    }
     /**
      * Process given socket.
      */
@@ -978,7 +981,7 @@
             if (executor == null) {
                 getWorkerThread().assign(socket);
             }  else {
-                executor.execute(new SocketProcessor(socket));
+                executor.execute(new SocketProcessor(socket,null));
             }
         } catch (Throwable t) {
             // This means we got an OOM or similar creating a thread, or that
@@ -998,7 +1001,7 @@
             if (executor == null) {
                 getWorkerThread().assign(socket, status);
             } else {
-                executor.execute(new SocketEventProcessor(socket, status));
+                executor.execute(new SocketProcessor(socket, status));
             }
         } catch (Throwable t) {
             // This means we got an OOM or similar creating a thread, or that
@@ -1298,9 +1301,8 @@
                 while (iterator != null && iterator.hasNext()) {
                     SelectionKey sk = (SelectionKey) iterator.next();
                     KeyAttachment attachment = (KeyAttachment)sk.attachment();
-                    if ( processKey(sk, attachment) ) {
-                        iterator.remove(); //only remove it if the key was processed.
-                    } 
+                    iterator.remove();
+                    processKey(sk, attachment);
                 }//while
                 
                 //process timeouts
@@ -1691,6 +1693,7 @@
             thread = new Thread(this);
             thread.setName(getName() + "-" + (++curThreads));
             thread.setDaemon(true);
+            thread.setPriority(getThreadPriority());
             thread.start();
         }
 
@@ -1827,15 +1830,19 @@
     protected class SocketProcessor implements Runnable {
 
         protected NioChannel socket = null;
+        protected SocketStatus status = null; 
 
-        public SocketProcessor(NioChannel socket) {
+        public SocketProcessor(NioChannel socket, SocketStatus status) {
             this.socket = socket;
+            this.status = status;
         }
-
+         
         public void run() {
 
             // Process the request from this socket
-            if (handler.process(socket) == Handler.SocketState.CLOSED) {
+            boolean closed = (status==null)?(handler.process(socket)==Handler.SocketState.CLOSED) :
+                                            (handler.event(socket,status)==Handler.SocketState.CLOSED);
+            if (closed) {
                 // Close socket and pool
                 try {
                     try {socket.close();}catch (Exception ignore){}
@@ -1844,47 +1851,56 @@
                     log.error("",x);
                 }
                 socket = null;
+                status = null;
             }
 
         }
 
     }
+    // ---------------------------------------------- TaskQueue Inner Class
+    public static class TaskQueue extends LinkedBlockingQueue<Runnable> {
+        ThreadPoolExecutor parent = null;
+        
+        public TaskQueue() {
+            super();
+        }
 
-
-    // --------------------------------------- SocketEventProcessor Inner Class
-
-
-    /**
-     * This class is the equivalent of the Worker, but will simply use in an
-     * external Executor thread pool.
-     */
-    protected class SocketEventProcessor implements Runnable {
-
-        protected NioChannel socket = null;
-        protected SocketStatus status = null; 
-
-        public SocketEventProcessor(NioChannel socket, SocketStatus status) {
-            this.socket = socket;
-            this.status = status;
+        public TaskQueue(int initialCapacity) {
+            super(initialCapacity);
+        }
+ 
+        public TaskQueue(Collection<? extends Runnable> c) {
+            super(c);
         }
 
-        public void run() {
+        
+        public void setParent(ThreadPoolExecutor tp) {
+            parent = tp;
+        }
+        
+        public boolean offer(Runnable o) {
+            if ( parent != null && parent.getPoolSize()<parent.getMaximumPoolSize() ) return false;
+            else return super.offer(o);
+        }
+    }
 
-            // Process the request from this socket
-            if (handler.event(socket, status) == Handler.SocketState.CLOSED) {
-                // Close socket and pool
-                try {
-                    try {socket.close();}catch (Exception ignore){}
-                    if ( socket.isOpen() ) socket.close(true);
-                } catch ( Exception x ) {
-                    log.error("",x);
-                }
-                socket = null;
-            }
+    // ---------------------------------------------- ThreadFactory Inner Class
+    class TaskThreadFactory implements ThreadFactory {
+        final ThreadGroup group;
+        final AtomicInteger threadNumber = new AtomicInteger(1);
+        final String namePrefix;
 
+        TaskThreadFactory(String namePrefix) {
+            SecurityManager s = System.getSecurityManager();
+            group = (s != null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+            this.namePrefix = namePrefix;
         }
 
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());
+            t.setDaemon(true);
+            t.setPriority(getThreadPriority());
+            return t;
+        }
     }
-
-
 }

Modified: tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml?view=diff&rev=517941&r1=517940&r2=517941
==============================================================================
--- tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml (original)
+++ tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml Tue Mar 13 16:54:03 2007
@@ -395,6 +395,14 @@
     the -Djava.net.preferIPv4Stack=true value to your command line</p>
 
     <attributes>
+      <attribute name="useExecutor" required="false">
+        Set to true to use the NIO thread pool executor. The default value is <code>true</code>.
+        If set to false, it uses a thread pool based on a stack for its execution.
+        Generally, using the executor yields a little bit slower performance, but yields a better
+        fairness for processing connections in a high load environment as the traffic gets queued through a 
+        FIFO queue. If set to true(default) then the max pool size is the <code>maxThreads</code> attribute
+        and the core pool size is the <code>minSpareThreads</code>.
+      </attribute>
       <attribute name="acceptorThreadCount" required="false">
         <p>The number of threads to be used to accept connections. Increase this value on a multi CPU machine,
         although you would never really need more than 2. Also, with a lot of non keep alive connections,



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org