You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/12/15 02:35:07 UTC
svn commit: r487420 - in
/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport:
AbstractRxTask.java ReceiverBase.java RxTaskPool.java bio/BioReceiver.java
bio/BioReplicationTask.java nio/NioReceiver.java nio/NioReplicationTask.java
Author: fhanik
Date: Thu Dec 14 17:35:06 2006
New Revision: 487420
URL: http://svn.apache.org/viewvc?view=rev&rev=487420
Log:
Implemented the use of an executor, tasks that aren't able to run quite yet, will be added to a pool of events, this pool of events is a zero GC (RxTaskPool) and is bounded.
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/AbstractRxTask.java
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/RxTaskPool.java
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/AbstractRxTask.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/AbstractRxTask.java?view=diff&rev=487420&r1=487419&r2=487420
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/AbstractRxTask.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/AbstractRxTask.java Thu Dec 14 17:35:06 2006
@@ -26,7 +26,7 @@
* @author Filip Hanik
* @version $Revision$ $Date$
*/
-public abstract class AbstractRxTask extends Thread
+public abstract class AbstractRxTask implements Runnable
{
public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER;
@@ -41,7 +41,7 @@
this.callback = callback;
}
- public void setPool(RxTaskPool pool) {
+ public void setTaskPool(RxTaskPool pool) {
this.pool = pool;
}
@@ -57,7 +57,7 @@
this.doRun = doRun;
}
- public RxTaskPool getPool() {
+ public RxTaskPool getTaskPool() {
return pool;
}
Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java?view=diff&rev=487420&r1=487419&r2=487420
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java Thu Dec 14 17:35:06 2006
@@ -19,6 +19,11 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.ChannelReceiver;
@@ -58,8 +63,10 @@
private long tcpSelectorTimeout = 5000;
//how many times to search for an available socket
private int autoBind = 100;
- private int maxThreads = 6;
+ private int maxThreads = 15;
private int minThreads = 6;
+ private int maxTasks = 100;
+ private int minTasks = 10;
private boolean tcpNoDelay = true;
private boolean soKeepAlive = false;
private boolean ooBInline = true;
@@ -69,11 +76,23 @@
private int soTrafficClass = 0x04 | 0x08 | 0x010;
private int timeout = 3000; //3 seconds
private boolean useBufferPool = true;
+
+ private Executor executor;
public ReceiverBase() {
}
+ public void start() throws IOException {
+ if ( executor == null ) {
+ executor = new ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
+ }
+ }
+
+ public void stop() {
+ if ( executor instanceof ExecutorService ) ((ExecutorService)executor).shutdown();
+ }
+
/**
* getMessageListener
*
@@ -270,7 +289,7 @@
return listener;
}
- public RxTaskPool getPool() {
+ public RxTaskPool getTaskPool() {
return pool;
}
@@ -335,6 +354,22 @@
return securePort;
}
+ public int getMinTasks() {
+ return minTasks;
+ }
+
+ public int getMaxTasks() {
+ return maxTasks;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public boolean isListening() {
+ return listen;
+ }
+
/**
* @deprecated use setSelectorTimeout
* @param selTimeout long
@@ -429,7 +464,20 @@
this.securePort = securePort;
}
+ public void setMinTasks(int minTasks) {
+ this.minTasks = minTasks;
+ }
+
+ public void setMaxTasks(int maxTasks) {
+ this.maxTasks = maxTasks;
+ }
+
+ public void setExecutor(Executor executor) {
+ this.executor = executor;
+ }
+
public void heartbeat() {
//empty operation
}
+
}
Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/RxTaskPool.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/RxTaskPool.java?view=diff&rev=487420&r1=487419&r2=487420
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/RxTaskPool.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/RxTaskPool.java Thu Dec 14 17:35:06 2006
@@ -19,6 +19,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.ThreadFactory;
/**
* @author not attributable
@@ -40,8 +41,8 @@
boolean running = true;
private static int counter = 1;
- private int maxThreads;
- private int minThreads;
+ private int maxTasks;
+ private int minTasks;
private TaskCreator creator = null;
@@ -50,34 +51,27 @@
}
- public RxTaskPool (int maxThreads, int minThreads, TaskCreator creator) throws Exception {
+ public RxTaskPool (int maxTasks, int minTasks, TaskCreator creator) throws Exception {
// fill up the pool with worker threads
- this.maxThreads = maxThreads;
- this.minThreads = minThreads;
+ this.maxTasks = maxTasks;
+ this.minTasks = minTasks;
this.creator = creator;
- //for (int i = 0; i < minThreads; i++) {
- for (int i = 0; i < maxThreads; i++) { //temporary fix for thread hand off problem
- AbstractRxTask thread = creator.getWorkerThread();
- setupThread(thread);
- idle.add (thread);
- }
}
- protected void setupThread(AbstractRxTask thread) {
- synchronized (thread) {
- thread.setPool(this);
- thread.setName(thread.getClass().getName() + "[" + inc() + "]");
- thread.setDaemon(true);
- thread.setPriority(Thread.MAX_PRIORITY);
- thread.start();
- try {thread.wait(500); }catch ( InterruptedException x ) {}
+ protected void configureTask(AbstractRxTask task) {
+ synchronized (task) {
+ task.setTaskPool(this);
+// task.setName(task.getClass().getName() + "[" + inc() + "]");
+// task.setDaemon(true);
+// task.setPriority(Thread.MAX_PRIORITY);
+// task.start();
}
}
/**
* Find an idle worker thread, if any. Could return null.
*/
- public AbstractRxTask getWorker()
+ public AbstractRxTask getRxTask()
{
AbstractRxTask worker = null;
synchronized (mutex) {
@@ -89,9 +83,9 @@
//this means that there are no available workers
worker = null;
}
- } else if ( used.size() < this.maxThreads && creator != null) {
- worker = creator.getWorkerThread();
- setupThread(worker);
+ } else if ( used.size() < this.maxTasks && creator != null) {
+ worker = creator.createRxTask();
+ configureTask(worker);
} else {
try { mutex.wait(); } catch ( java.lang.InterruptedException x ) {Thread.currentThread().interrupted();}
}
@@ -114,7 +108,7 @@
synchronized (mutex) {
used.remove(worker);
//if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker);
- if ( idle.size() < maxThreads && !idle.contains(worker)) idle.add(worker); //let max be the upper limit
+ if ( idle.size() < maxTasks && !idle.contains(worker)) idle.add(worker); //let max be the upper limit
else {
worker.setDoRun(false);
synchronized (worker){worker.notify();}
@@ -128,11 +122,11 @@
}
public int getMaxThreads() {
- return maxThreads;
+ return maxTasks;
}
public int getMinThreads() {
- return minThreads;
+ return minTasks;
}
public void stop() {
@@ -147,19 +141,19 @@
}
}
- public void setMaxThreads(int maxThreads) {
- this.maxThreads = maxThreads;
+ public void setMaxTasks(int maxThreads) {
+ this.maxTasks = maxThreads;
}
- public void setMinThreads(int minThreads) {
- this.minThreads = minThreads;
+ public void setMinTasks(int minThreads) {
+ this.minTasks = minThreads;
}
- public TaskCreator getThreadCreator() {
+ public TaskCreator getTaskCreator() {
return this.creator;
}
- public static interface TaskCreator {
- public AbstractRxTask getWorkerThread();
+ public static interface TaskCreator {
+ public AbstractRxTask createRxTask();
}
}
Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java?view=diff&rev=487420&r1=487419&r2=487420
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java Thu Dec 14 17:35:06 2006
@@ -53,6 +53,7 @@
* @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
*/
public void start() throws IOException {
+ super.start();
try {
setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this));
} catch (Exception x) {
@@ -73,7 +74,7 @@
}
}
- public AbstractRxTask getWorkerThread() {
+ public AbstractRxTask createRxTask() {
return getReplicationThread();
}
@@ -93,6 +94,7 @@
try {
this.serverSocket.close();
}catch ( Exception x ) {}
+ super.stop();
}
@@ -125,20 +127,21 @@
while ( doListen() ) {
Socket socket = null;
- if ( getPool().available() < 1 ) {
+ if ( getTaskPool().available() < 1 ) {
if ( log.isWarnEnabled() )
log.warn("All BIO server replication threads are busy, unable to handle more requests until a thread is freed up.");
}
- BioReplicationTask thread = (BioReplicationTask)getPool().getWorker();
- if ( thread == null ) continue; //should never happen
+ BioReplicationTask task = (BioReplicationTask)getTaskPool().getRxTask();
+ if ( task == null ) continue; //should never happen
try {
socket = serverSocket.accept();
}catch ( Exception x ) {
if ( doListen() ) throw x;
}
if ( !doListen() ) {
- thread.setDoRun(false);
- thread.serviceSocket(null,null);
+ task.setDoRun(false);
+ task.serviceSocket(null,null);
+ getExecutor().execute(task);
break; //regular shutdown
}
if ( socket == null ) continue;
@@ -152,7 +155,7 @@
socket.setTrafficClass(getSoTrafficClass());
socket.setSoTimeout(getTimeout());
ObjectReader reader = new ObjectReader(socket);
- thread.serviceSocket(socket,reader);
+ task.serviceSocket(socket,reader);
}//while
}
Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java?view=diff&rev=487420&r1=487419&r2=487420
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java Thu Dec 14 17:35:06 2006
@@ -59,32 +59,19 @@
// loop forever waiting for work to do
public synchronized void run()
{
- this.notify();
- while (isDoRun()) {
- try {
- // sleep and release object lock
- this.wait();
- } catch (InterruptedException e) {
- if(log.isInfoEnabled())
- log.info("TCP worker thread interrupted in cluster",e);
- // clear interrupt status
- Thread.interrupted();
- }
- if ( socket == null ) continue;
- try {
- drainSocket();
- } catch ( Exception x ) {
- log.error("Unable to service bio socket");
- }finally {
- try {socket.close();}catch ( Exception ignore){}
- try {reader.close();}catch ( Exception ignore){}
- reader = null;
- socket = null;
- }
- // done, ready for more, return to pool
- if ( getPool() != null ) getPool().returnWorker (this);
- else setDoRun(false);
+ if ( socket == null ) return;
+ try {
+ drainSocket();
+ } catch ( Exception x ) {
+ log.error("Unable to service bio socket");
+ }finally {
+ try {socket.close();}catch ( Exception ignore){}
+ try {reader.close();}catch ( Exception ignore){}
+ reader = null;
+ socket = null;
}
+ // done, ready for more, return to pool
+ if ( getTaskPool() != null ) getTaskPool().returnWorker (this);
}
Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java?view=diff&rev=487420&r1=487419&r2=487420
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java Thu Dec 14 17:35:06 2006
@@ -80,6 +80,7 @@
public void stop() {
this.stopListening();
+ super.stop();
}
/**
@@ -88,8 +89,8 @@
* @see org.apache.catalina.tribes.ClusterReceiver#start()
*/
public void start() throws IOException {
+ super.start();
try {
-// setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this));
setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this));
} catch (Exception x) {
log.fatal("ThreadPool can initilzed. Listener not started", x);
@@ -109,7 +110,7 @@
}
}
- public AbstractRxTask getWorkerThread() {
+ public AbstractRxTask createRxTask() {
NioReplicationTask thread = new NioReplicationTask(this,this);
thread.setUseBufferPool(this.getUseBufferPool());
thread.setRxBufSize(getRxBufSize());
@@ -142,7 +143,7 @@
events.add(event);
}
if ( log.isTraceEnabled() ) log.trace("Adding event to selector:"+event);
- selector.wakeup();
+ if ( isListening() && selector!=null ) selector.wakeup();
}
}
@@ -177,7 +178,9 @@
protected void socketTimeouts() {
//timeout
- Set keys = selector.keys();
+ Selector tmpsel = selector;
+ Set keys = (isListening()&&tmpsel!=null)?tmpsel.keys():null;
+ if ( keys == null ) return;
long now = System.currentTimeMillis();
for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
SelectionKey key = (SelectionKey) iter.next();
@@ -365,17 +368,18 @@
* will then de-register the channel on the next select call.
*/
protected void readDataFromSocket(SelectionKey key) throws Exception {
- NioReplicationTask worker = (NioReplicationTask) getPool().getWorker();
- if (worker == null) {
- // No threads available, do nothing, the selection
+ NioReplicationTask task = (NioReplicationTask) getTaskPool().getRxTask();
+ if (task == null) {
+ // No threads/tasks available, do nothing, the selection
// loop will keep calling this method until a
// thread becomes available, the thread pool itself has a waiting mechanism
// so we will not wait here.
- if (log.isDebugEnabled())
- log.debug("No TcpReplicationThread available");
+ if (log.isDebugEnabled()) log.debug("No TcpReplicationThread available");
} else {
// invoking this wakes up the worker thread then returns
- worker.serviceChannel(key);
+ //add task to thread pool
+ task.serviceChannel(key);
+ getExecutor().execute(task);
}
}
Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java?view=diff&rev=487420&r1=487419&r2=487420
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java Thu Dec 14 17:35:06 2006
@@ -63,60 +63,50 @@
// loop forever waiting for work to do
public synchronized void run() {
- this.notify();
if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
buffer = ByteBuffer.allocateDirect(getRxBufSize());
}else {
buffer = ByteBuffer.allocate (getRxBufSize());
}
- while (isDoRun()) {
- try {
- // sleep and release object lock
- this.wait();
- } catch (InterruptedException e) {
- if(log.isInfoEnabled()) log.info("TCP worker thread interrupted in cluster",e);
- // clear interrupt status
- Thread.interrupted();
- }
- if (key == null) {
- continue; // just in case
- }
- if ( log.isTraceEnabled() )
- log.trace("Servicing key:"+key);
- try {
- ObjectReader reader = (ObjectReader)key.attachment();
- if ( reader == null ) {
- if ( log.isTraceEnabled() )
- log.trace("No object reader, cancelling:"+key);
- cancelKey(key);
- } else {
- if ( log.isTraceEnabled() )
- log.trace("Draining channel:"+key);
+ if (key == null) {
+ return; // just in case
+ }
+ if ( log.isTraceEnabled() )
+ log.trace("Servicing key:"+key);
- drainChannel(key, reader);
- }
- } catch (Exception e) {
- //this is common, since the sockets on the other
- //end expire after a certain time.
- if ( e instanceof CancelledKeyException ) {
- //do nothing
- } else if ( e instanceof IOException ) {
- //dont spew out stack traces for IO exceptions unless debug is enabled.
- if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
- else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].");
- } else if ( log.isErrorEnabled() ) {
- //this is a real error, log it.
- log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
- }
+ try {
+ ObjectReader reader = (ObjectReader)key.attachment();
+ if ( reader == null ) {
+ if ( log.isTraceEnabled() )
+ log.trace("No object reader, cancelling:"+key);
cancelKey(key);
- } finally {
-
+ } else {
+ if ( log.isTraceEnabled() )
+ log.trace("Draining channel:"+key);
+
+ drainChannel(key, reader);
}
- key = null;
- // done, ready for more, return to pool
- getPool().returnWorker (this);
+ } catch (Exception e) {
+ //this is common, since the sockets on the other
+ //end expire after a certain time.
+ if ( e instanceof CancelledKeyException ) {
+ //do nothing
+ } else if ( e instanceof IOException ) {
+ //dont spew out stack traces for IO exceptions unless debug is enabled.
+ if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
+ else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].");
+ } else if ( log.isErrorEnabled() ) {
+ //this is a real error, log it.
+ log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
+ }
+ cancelKey(key);
+ } finally {
+
}
+ key = null;
+ // done, ready for more, return to pool
+ getTaskPool().returnWorker (this);
}
/**
@@ -131,14 +121,12 @@
* worker thread is servicing it.
*/
public synchronized void serviceChannel (SelectionKey key) {
- if ( log.isTraceEnabled() )
- log.trace("About to service key:"+key);
+ if ( log.isTraceEnabled() ) log.trace("About to service key:"+key);
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader != null ) reader.setLastAccess(System.currentTimeMillis());
this.key = key;
key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
- this.notify(); // awaken the thread
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org