You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2008/12/08 04:32:08 UTC
svn commit: r724239 -
/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
Author: fhanik
Date: Sun Dec 7 19:32:07 2008
New Revision: 724239
URL: http://svn.apache.org/viewvc?rev=724239&view=rev
Log:
Remove the synchronize/notifyAll based thread pool, and use only the built in pool
Modified:
tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=724239&r1=724238&r2=724239&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Sun Dec 7 19:32:07 2008
@@ -116,12 +116,6 @@
/**
- * Available workers.
- */
- protected WorkerStack workers = null;
-
-
- /**
* Running state of the endpoint.
*/
protected volatile boolean running = false;
@@ -349,7 +343,11 @@
public Executor getExecutor() { return executor; }
protected boolean useExecutor = true;
- public void setUseExecutor(boolean useexec) { useExecutor = useexec;}
+ /**
+ * @deprecated Executor is always used
+ * @param useexec
+ */
+ public void setUseExecutor(boolean useexec) { log.info("Setting useExecutor is deprecated. Executors are always used.");}
public boolean getUseExecutor() { return useExecutor || (executor!=null);}
/**
@@ -359,14 +357,10 @@
public void setMaxThreads(int maxThreads) {
this.maxThreads = maxThreads;
if (running) {
- if (getUseExecutor() && executor!=null) {
+ if (executor!=null) {
if (executor instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor)executor).setMaximumPoolSize(maxThreads);
}
- }else if (workers!=null){
- synchronized(workers) {
- workers.resize(maxThreads);
- }
}
}
}
@@ -857,15 +851,11 @@
paused = false;
// Create worker collection
- if (getUseExecutor()) {
- if ( executor == null ) {
- TaskQueue taskqueue = new TaskQueue();
- TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-");
- executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
- taskqueue.setParent( (ThreadPoolExecutor) executor, this);
- }
- } else if ( executor == null ) {//avoid two thread pools being created
- workers = new WorkerStack(maxThreads);
+ if ( executor == null ) {
+ TaskQueue taskqueue = new TaskQueue();
+ TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-");
+ executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
+ taskqueue.setParent( (ThreadPoolExecutor) executor, this);
}
// Start poller threads
@@ -1104,99 +1094,8 @@
protected boolean isWorkerAvailable() {
if ( executor != null ) {
return true;
- } else {
- if (workers.size() > 0) {
- return true;
- }
- if ( (maxThreads > 0) && (curThreads < maxThreads)) {
- return true;
- } else {
- if (maxThreads < 0) {
- return true;
- } else {
- return false;
- }
- }
- }
- }
- /**
- * Create (or allocate) and return an available processor for use in
- * processing a specific HTTP request, if possible. If the maximum
- * allowed processors have already been created and are in use, return
- * <code>null</code> instead.
- */
- protected Worker createWorkerThread() {
-
- synchronized (workers) {
- if (workers.size() > 0) {
- curThreadsBusy++;
- return (workers.pop());
- }
- if ((maxThreads > 0) && (curThreads < maxThreads)) {
- curThreadsBusy++;
- if (curThreadsBusy == maxThreads) {
- log.info(sm.getString("endpoint.info.maxThreads",
- Integer.toString(maxThreads), address,
- Integer.toString(port)));
- }
- return (newWorkerThread());
- } else {
- if (maxThreads < 0) {
- curThreadsBusy++;
- return (newWorkerThread());
- } else {
- return (null);
- }
- }
- }
- }
-
-
- /**
- * Create and return a new processor suitable for processing HTTP
- * requests and returning the corresponding responses.
- */
- protected Worker newWorkerThread() {
-
- Worker workerThread = new Worker();
- workerThread.start();
- return (workerThread);
-
- }
-
-
- /**
- * Return a new worker thread, and block while to worker is available.
- */
- protected Worker getWorkerThread() {
- // Allocate a new worker thread
- Worker workerThread = createWorkerThread();
- while (workerThread == null) {
- try {
- synchronized (workers) {
- workerThread = createWorkerThread();
- if ( workerThread == null ) workers.wait();
- }
- } catch (InterruptedException e) {
- // Ignore
- }
- if ( workerThread == null ) workerThread = createWorkerThread();
- }
- return workerThread;
- }
-
-
- /**
- * Recycle the specified Processor so that it can be used again.
- *
- * @param workerThread The processor to be recycled
- */
- protected void recycleWorkerThread(Worker workerThread) {
- synchronized (workers) {
- workers.push(workerThread);
- curThreadsBusy--;
- workers.notify();
}
+ return false;
}
/**
* Process given socket.
@@ -1217,15 +1116,11 @@
try {
KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false);
attachment.setCometNotify(false); //will get reset upon next reg
- if (executor == null) {
- getWorkerThread().assign(socket, status);
- } else {
- SocketProcessor sc = processorCache.poll();
- if ( sc == null ) sc = new SocketProcessor(socket,status);
- else sc.reset(socket,status);
- if ( dispatch ) executor.execute(sc);
- else sc.run();
- }
+ SocketProcessor sc = processorCache.poll();
+ if ( sc == null ) sc = new SocketProcessor(socket,status);
+ else sc.reset(socket,status);
+ if ( dispatch && executor!=null ) executor.execute(sc);
+ else sc.run();
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
@@ -1888,162 +1783,6 @@
protected long lastRegistered = 0;
protected SendfileData sendfileData = null;
}
- // ----------------------------------------------------- Worker Inner Class
-
-
- /**
- * Server processor class.
- */
- protected class Worker implements Runnable {
-
-
- protected Thread thread = null;
- protected boolean available = false;
- protected Object socket = null;
- protected SocketStatus status = null;
-
-
- /**
- * Process an incoming TCP/IP connection on the specified socket. Any
- * exception that occurs during processing must be logged and swallowed.
- * <b>NOTE</b>: This method is called from our Connector's thread. We
- * must assign it to our own thread so that multiple simultaneous
- * requests can be handled.
- *
- * @param socket TCP socket to process
- */
- protected synchronized void assign(Object socket) {
-
- // Wait for the Processor to get the previous Socket
- while (available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
- // Store the newly available Socket and notify our thread
- this.socket = socket;
- status = null;
- available = true;
- notifyAll();
-
- }
-
-
- protected synchronized void assign(Object socket, SocketStatus status) {
-
- // Wait for the Processor to get the previous Socket
- while (available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Store the newly available Socket and notify our thread
- this.socket = socket;
- this.status = status;
- available = true;
- notifyAll();
- }
-
-
- /**
- * Await a newly assigned Socket from our Connector, or <code>null</code>
- * if we are supposed to shut down.
- */
- protected synchronized Object await() {
-
- // Wait for the Connector to provide a new Socket
- while (!available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Notify the Connector that we have received this Socket
- Object socket = this.socket;
- available = false;
- notifyAll();
-
- return (socket);
-
- }
-
-
- /**
- * The background thread that listens for incoming TCP/IP connections and
- * hands them off to an appropriate processor.
- */
- public void run() {
-
- // Process requests until we receive a shutdown signal
- while (running) {
- NioChannel socket = null;
- SelectionKey key = null;
- try {
- // Wait for the next socket to be assigned
- Object channel = await();
- if (channel == null)
- continue;
-
- if ( channel instanceof SocketChannel) {
- SocketChannel sc = (SocketChannel)channel;
- if ( !setSocketOptions(sc) ) {
- try {
- sc.socket().close();
- sc.close();
- }catch ( IOException ix ) {
- if ( log.isDebugEnabled() ) log.debug("",ix);
- }
- } else {
- //now we have it registered, remove it from the cache
-
- }
- } else {
- socket = (NioChannel)channel;
- SocketProcessor sc = processorCache.poll();
- if ( sc == null ) sc = new SocketProcessor(socket,status);
- else sc.reset(socket,status);
- sc.run();
- }
- }catch(CancelledKeyException cx) {
- if (socket!=null && key!=null) socket.getPoller().cancelledKey(key,null,false);
- } catch (OutOfMemoryError oom) {
- try {
- oomParachuteData = null;
- releaseCaches();
- log.error("", oom);
- }catch ( Throwable oomt ) {
- try {
- System.err.println(oomParachuteMsg);
- oomt.printStackTrace();
- }catch (Throwable letsHopeWeDontGetHere){}
- }
- } finally {
- //dereference socket to let GC do its job
- socket = null;
- // Finish up this request
- recycleWorkerThread(this);
- }
- }
- }
-
-
- /**
- * Start the background processing thread.
- */
- public void start() {
- thread = new Thread(this);
- thread.setName(getName() + "-" + (++curThreads));
- thread.setDaemon(true);
- thread.setPriority(getThreadPriority());
- thread.start();
- }
-
-
- }
// ------------------------------------------------ Application Buffer Handler
public class NioBufferHandler implements ApplicationBufferHandler {
@@ -2085,84 +1824,6 @@
}
- // ------------------------------------------------- WorkerStack Inner Class
-
-
- public class WorkerStack {
-
- protected Worker[] workers = null;
- protected int end = 0;
-
- public WorkerStack(int size) {
- workers = new Worker[size];
- }
-
- /**
- * Put the object into the queue. If the queue is full (for example if
- * the queue has been reduced in size) the object will be dropped.
- *
- * @param object the object to be appended to the queue (first
- * element).
- */
- public void push(Worker worker) {
- if (end < workers.length) {
- workers[end++] = worker;
- } else {
- curThreads--;
- }
- }
-
- /**
- * Get the first object out of the queue. Return null if the queue
- * is empty.
- */
- public Worker pop() {
- if (end > 0) {
- return workers[--end];
- }
- return null;
- }
-
- /**
- * Get the first object out of the queue, Return null if the queue
- * is empty.
- */
- public Worker peek() {
- return workers[end];
- }
-
- /**
- * Is the queue empty?
- */
- public boolean isEmpty() {
- return (end == 0);
- }
-
- /**
- * How many elements are there in this queue?
- */
- public int size() {
- return (end);
- }
-
- /**
- * Resize the queue. If there are too many objects in the queue for the
- * new size, drop the excess.
- *
- * @param newSize
- */
- public void resize(int newSize) {
- Worker[] newWorkers = new Worker[newSize];
- int len = workers.length;
- if (newSize < len) {
- len = newSize;
- }
- System.arraycopy(workers, 0, newWorkers, 0, len);
- workers = newWorkers;
- }
- }
-
-
// ---------------------------------------------- SocketProcessor Inner Class
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org