You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2007/03/14 00:54:04 UTC
svn commit: r517941 - in /tomcat/tc6.0.x/trunk:
java/org/apache/coyote/http11/Http11NioProtocol.java
java/org/apache/tomcat/util/net/NioEndpoint.java webapps/docs/config/http.xml
Author: fhanik
Date: Tue Mar 13 16:54:03 2007
New Revision: 517941
URL: http://svn.apache.org/viewvc?view=rev&rev=517941
Log:
Implement the use of a useful executor, this executor will increase threads until it reaches max threads, then it starts queueing the connections. This yields in much better fairness.
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml
Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java?view=diff&rev=517941&r1=517940&r2=517941
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java Tue Mar 13 16:54:03 2007
@@ -256,6 +256,10 @@
public void setExecutor(Executor executor) {
ep.setExecutor(executor);
}
+
+ public void setUseExecutor(boolean useexec) {
+ ep.setUseExecutor(useexec);
+ }
public int getMaxThreads() {
return ep.getMaxThreads();
Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?view=diff&rev=517941&r1=517940&r2=517941
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Tue Mar 13 16:54:03 2007
@@ -51,6 +51,8 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Collection;
+import java.util.concurrent.ThreadFactory;
/**
* NIO tailored thread pool, providing the following services:
@@ -268,7 +270,10 @@
protected Executor executor = null;
public void setExecutor(Executor executor) { this.executor = executor; }
public Executor getExecutor() { return executor; }
-
+
+ protected boolean useExecutor = true;
+ public void setUseExecutor(boolean useexec) { useExecutor = useexec;}
+ public boolean getUseExecutor() { return useExecutor;}
/**
* Maximum amount of worker threads.
@@ -639,7 +644,7 @@
/**
- * Start the APR endpoint, creating acceptor, poller threads.
+ * Start the NIO endpoint, creating acceptor, poller threads.
*/
public void start()
throws Exception {
@@ -652,9 +657,15 @@
paused = false;
// Create worker collection
- if (executor == null) {
- //workers = new WorkerStack(maxThreads);
- executor = new ThreadPoolExecutor(getMaxThreads(),getMaxThreads(),5000,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
+ if (getUseExecutor()) {
+ if ( executor == null ) {
+ TaskQueue taskqueue = new TaskQueue();
+ TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-");
+ executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
+ taskqueue.setParent( (ThreadPoolExecutor) executor);
+ }
+ } else {
+ workers = new WorkerStack(maxThreads);
}
// Start acceptor threads
@@ -716,6 +727,13 @@
eventCache.clear();
keyCache.clear();
nioChannels.clear();
+ if ( executor!=null ) {
+ ThreadPoolExecutor tpe = (ThreadPoolExecutor)executor;
+ tpe.shutdown();
+ TaskQueue queue = (TaskQueue)tpe.getQueue();
+ queue.setParent(null);
+ executor = null;
+ }
}
@@ -863,6 +881,8 @@
*/
protected boolean isWorkerAvailable() {
if ( executor != null ) {
+// ThreadPoolExecutor tpe = (ThreadPoolExecutor)executor;
+// TaskQueue queue = (TaskQueue)tpe.getQueue();
return true;
} else {
if (workers.size() > 0) {
@@ -953,23 +973,6 @@
workers.notify();
}
}
-
-
- protected boolean processSocket(SocketChannel socket) {
- try {
- if (executor == null) {
- getWorkerThread().assign(socket);
- } else {
- executor.execute(new SocketOptionsProcessor(socket));
- }
- } catch (Throwable t) {
- // This means we got an OOM or similar creating a thread, or that
- // the pool and its queue are full
- log.error(sm.getString("endpoint.process.fail"), t);
- return false;
- }
- return true;
- }
/**
* Process given socket.
*/
@@ -978,7 +981,7 @@
if (executor == null) {
getWorkerThread().assign(socket);
} else {
- executor.execute(new SocketProcessor(socket));
+ executor.execute(new SocketProcessor(socket,null));
}
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
@@ -998,7 +1001,7 @@
if (executor == null) {
getWorkerThread().assign(socket, status);
} else {
- executor.execute(new SocketEventProcessor(socket, status));
+ executor.execute(new SocketProcessor(socket, status));
}
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
@@ -1298,9 +1301,8 @@
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = (SelectionKey) iterator.next();
KeyAttachment attachment = (KeyAttachment)sk.attachment();
- if ( processKey(sk, attachment) ) {
- iterator.remove(); //only remove it if the key was processed.
- }
+ iterator.remove();
+ processKey(sk, attachment);
}//while
//process timeouts
@@ -1691,6 +1693,7 @@
thread = new Thread(this);
thread.setName(getName() + "-" + (++curThreads));
thread.setDaemon(true);
+ thread.setPriority(getThreadPriority());
thread.start();
}
@@ -1827,15 +1830,19 @@
protected class SocketProcessor implements Runnable {
protected NioChannel socket = null;
+ protected SocketStatus status = null;
- public SocketProcessor(NioChannel socket) {
+ public SocketProcessor(NioChannel socket, SocketStatus status) {
this.socket = socket;
+ this.status = status;
}
-
+
public void run() {
// Process the request from this socket
- if (handler.process(socket) == Handler.SocketState.CLOSED) {
+ boolean closed = (status==null)?(handler.process(socket)==Handler.SocketState.CLOSED) :
+ (handler.event(socket,status)==Handler.SocketState.CLOSED);
+ if (closed) {
// Close socket and pool
try {
try {socket.close();}catch (Exception ignore){}
@@ -1844,47 +1851,56 @@
log.error("",x);
}
socket = null;
+ status = null;
}
}
}
+ // ---------------------------------------------- TaskQueue Inner Class
+ public static class TaskQueue extends LinkedBlockingQueue<Runnable> {
+ ThreadPoolExecutor parent = null;
+
+ public TaskQueue() {
+ super();
+ }
-
- // --------------------------------------- SocketEventProcessor Inner Class
-
-
- /**
- * This class is the equivalent of the Worker, but will simply use in an
- * external Executor thread pool.
- */
- protected class SocketEventProcessor implements Runnable {
-
- protected NioChannel socket = null;
- protected SocketStatus status = null;
-
- public SocketEventProcessor(NioChannel socket, SocketStatus status) {
- this.socket = socket;
- this.status = status;
+ public TaskQueue(int initialCapacity) {
+ super(initialCapacity);
+ }
+
+ public TaskQueue(Collection<? extends Runnable> c) {
+ super(c);
}
- public void run() {
+
+ public void setParent(ThreadPoolExecutor tp) {
+ parent = tp;
+ }
+
+ public boolean offer(Runnable o) {
+ if ( parent != null && parent.getPoolSize()<parent.getMaximumPoolSize() ) return false;
+ else return super.offer(o);
+ }
+ }
- // Process the request from this socket
- if (handler.event(socket, status) == Handler.SocketState.CLOSED) {
- // Close socket and pool
- try {
- try {socket.close();}catch (Exception ignore){}
- if ( socket.isOpen() ) socket.close(true);
- } catch ( Exception x ) {
- log.error("",x);
- }
- socket = null;
- }
+ // ---------------------------------------------- ThreadFactory Inner Class
+ class TaskThreadFactory implements ThreadFactory {
+ final ThreadGroup group;
+ final AtomicInteger threadNumber = new AtomicInteger(1);
+ final String namePrefix;
+ TaskThreadFactory(String namePrefix) {
+ SecurityManager s = System.getSecurityManager();
+ group = (s != null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+ this.namePrefix = namePrefix;
}
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());
+ t.setDaemon(true);
+ t.setPriority(getThreadPriority());
+ return t;
+ }
}
-
-
}
Modified: tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml?view=diff&rev=517941&r1=517940&r2=517941
==============================================================================
--- tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml (original)
+++ tomcat/tc6.0.x/trunk/webapps/docs/config/http.xml Tue Mar 13 16:54:03 2007
@@ -395,6 +395,14 @@
the -Djava.net.preferIPv4Stack=true value to your command line</p>
<attributes>
+ <attribute name="useExecutor" required="false">
+ Set to true to use the NIO thread pool executor. The default value is <code>true</code>.
+ If set to false, it uses a thread pool based on a stack for its execution.
+ Generally, using the executor yields a little bit slower performance, but yields a better
+ fairness for processing connections in a high load environment as the traffic gets queued through a
+ FIFO queue. If set to true(default) then the max pool size is the <code>maxThreads</code> attribute
+ and the core pool size is the <code>minSpareThreads</code>.
+ </attribute>
<attribute name="acceptorThreadCount" required="false">
<p>The number of threads to be used to accept connections. Increase this value on a multi CPU machine,
although you would never really need more than 2. Also, with a lot of non keep alive connections,
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org