You are viewing a plain text version of this content. The canonical link for it is here.
Posted to droids-commits@incubator.apache.org by th...@apache.org on 2009/09/02 09:53:29 UTC
svn commit: r810437 -
/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
Author: thorsten
Date: Wed Sep 2 09:53:29 2009
New Revision: 810437
URL: http://svn.apache.org/viewvc?rev=810437&view=rev
Log:
DROIDS-51
review MultiThreadedTaskMaster thread creation/looping
due-to: Ryan McKinley
Modified:
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
Modified: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java?rev=810437&r1=810436&r2=810437&view=diff
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java (original)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java Wed Sep 2 09:53:29 2009
@@ -22,8 +22,10 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@@ -58,8 +60,9 @@
public class MultiThreadedTaskMaster<T extends Task> implements TaskMaster<T> {
protected final Log log = LogFactory.getLog(MultiThreadedTaskMaster.class);
private static final long KEEP_ALIVE = 50000L;
- private static final long TICKLE_TIME = 100L;
+ private static final long TICKLE_TIME = 250L;
+ protected String name = getClass().getName();
protected ThreadPoolExecutor pool = null;
protected int maxThreads = 0;
protected TaskQueue<T> queue = null;
@@ -74,6 +77,35 @@
protected AtomicLong completedCount = new AtomicLong();
protected TaskExceptionHandler exHandler;
+
+ class DroidsThreadFactory implements ThreadFactory {
+ final ThreadGroup group;
+ final AtomicInteger threadNumber = new AtomicInteger(1);
+ final String prefix;
+
+ DroidsThreadFactory( String descr ) {
+ prefix = (descr==null) ? name+"-" : name+"-"+descr+"-";
+ SecurityManager s = System.getSecurityManager();
+ group = (s != null)? s.getThreadGroup() :
+ Thread.currentThread().getThreadGroup();
+ }
+
+ DroidsThreadFactory() {
+ this( null );
+ }
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(group, r,
+ prefix + threadNumber.getAndIncrement(),
+ 0);
+ if (t.isDaemon())
+ t.setDaemon(false);
+ if (t.getPriority() != Thread.NORM_PRIORITY)
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+ }
+
/**
* The queue has been initialized
*/
@@ -99,38 +131,50 @@
// start the pool
int poolSize = getMaxThreads();
// user may set a thread pool before calling the processAllTasks
- this.pool = this.pool != null ? this.pool : new ThreadPoolExecutor(poolSize, poolSize, KEEP_ALIVE, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(poolSize));
+ this.pool = this.pool != null ? this.pool : new ThreadPoolExecutor(poolSize, poolSize, KEEP_ALIVE, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(poolSize), new DroidsThreadFactory( "runner" ) );
this.pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());//task is not yet run, there is no impact to discard fail to submit task
if (log.isInfoEnabled()) {
log.info("processAllTasks() - created ThreadPoolExecutor - poolSize: " + poolSize);
}
// process tasks in a new thread
- ExecutorService executor = Executors.newSingleThreadExecutor();
+ ExecutorService executor = Executors.newSingleThreadExecutor( new DroidsThreadFactory( "main" ) );
executor.execute( new Runnable() {
@Override
public void run() {
- while (queue.hasNext() || pool.getActiveCount() > 0) {
- if (log.isTraceEnabled()) {
- log.trace("processAllTasks() - pool.activeCount: " + pool.getActiveCount() + ", pool.maximumPoolSize(): " + pool.getMaximumPoolSize());
+ while( state == ExecutionState.RUNNING && !pool.isTerminating() ) {
+ if( queue.hasNext() ) {
+ if( pool.getActiveCount() < pool.getMaximumPoolSize() ) {
+ submitNewTaskRunner(pool, queue);
+ continue; // skip the rest of the loop
+ }
+ else {
+ // pool is full but we have more to do
+ //log.info("More tasks to process, but pool is full");
+ }
+ }
+ else {
+ // no new tasks to process
+ //log.info("No tasks to process....");
}
- if (queue.hasNext() && pool.getActiveCount() < pool.getMaximumPoolSize()) {
- nextTask(pool, queue);
+ // Try sleeping for a little bit
+ try {
+ if (log.isDebugEnabled())
+ log.debug("processAllTasks() - no task or thread pool is full, to check again in " + TICKLE_TIME + "ms - queue.size: " + queue.getSize());
+ Thread.sleep(TICKLE_TIME);
}
- else {
- try {
- if (log.isDebugEnabled())
- log.debug("processAllTasks() - no task or thread pool is full, to check again in " + TICKLE_TIME + "ms - queue.size: " + queue.getSize());
- Thread.sleep(TICKLE_TIME);
- }
- catch (InterruptedException e) {
- log.error(e);
- }
+ catch (InterruptedException e) {
+ log.error(e);
+ }
+
+ // Check if we are done
+ if( !queue.hasNext() && pool.getActiveCount() < 1 ) {
+ log.info("Finshed invocation, waiting for workers to finish.");
+ terminate();
+ break;
}
}
- state = ExecutionState.COMPLETE;
- log.info("Finshed invocation, waiting for workers to finish.");
}
});
}
@@ -152,77 +196,78 @@
* @param queue
* @return
*/
- protected Future nextTask(ExecutorService executor, final TaskQueue<? extends T> queue) {
+ protected Future submitNewTaskRunner(ExecutorService executor, final TaskQueue<? extends T> queue) {
return executor.submit(new Runnable() {
public void run() {
String threadName = Thread.currentThread().getName();
- T task = queue.next();
- long delay = (delayTimer != null) ? delayTimer.getDelayMillis() : 0;
- if (log.isDebugEnabled())
- log.debug("run() - begin - thread: " + threadName + ", task: " + task.getId() + ", delay: " + delay);
- Worker<T> worker = null;
- Exception ex = null;
- try {
- if( delay > 0 ) {
- Thread.sleep(delay); // gets the current thread
- }
- worker = droid.getNewWorker();
- if( monitor != null ) {
- monitor.beforeExecute( task, worker );
- }
- worker.execute(task);
- lastCompletedTask = task;
- } catch (DroidsException e) {
- ex = e;
- } catch (IOException e) {
- ex = e;
- } catch (InterruptedException e) {
- ex = e;
- }
- finally {
- // Handle any exceptions
- boolean terminate = false;
- if (ex != null) {
- try {
- TaskExceptionResult result = TaskExceptionResult.WARN;
- if (exHandler != null) {
- result = exHandler.handleException(ex);
+ // use the same thread to keep processing
+ while( queue.hasNext() && pool.getActiveCount() <= maxThreads && !pool.isTerminating() ) {
+ T task = queue.next();
+
+ long delay = (delayTimer != null) ? delayTimer.getDelayMillis() : 0;
+ if (log.isDebugEnabled())
+ log.debug("run() - begin - thread: " + threadName + ", task: " + task.getId() + ", delay: " + delay);
+
+ Worker<T> worker = null;
+ Exception ex = null;
+ try {
+ if( delay > 0 ) {
+ Thread.sleep(delay); // gets the current thread
+ }
+ worker = droid.getNewWorker();
+ if( monitor != null ) {
+ monitor.beforeExecute( task, worker );
+ }
+ worker.execute(task);
+ lastCompletedTask = task;
+ } catch (DroidsException e) {
+ ex = e;
+ } catch (IOException e) {
+ ex = e;
+ } catch (InterruptedException e) {
+ ex = e;
+ }
+ finally {
+ // Handle any exceptions
+ boolean terminate = false;
+ if (ex != null) {
+ try {
+ TaskExceptionResult result = TaskExceptionResult.WARN;
+ if (exHandler != null) {
+ result = exHandler.handleException(ex);
+ }
+ switch (result) {
+ case WARN:
+ log.warn(ex.toString(), ex);
+ break;
+ case FATAL:
+ log.warn(ex.getMessage(), ex);
+ terminate = true;
+ break;
+ case IGNORE: break; // nothing
+ }
}
- switch (result) {
- case WARN:
- log.warn(ex.toString(), ex);
- break;
- case FATAL:
- log.warn(ex.getMessage(), ex);
- terminate = true;
- break;
- case IGNORE: break; // nothing
+ catch( Exception e2 ) {
+ log.error( e2.getMessage(), e2 );
}
}
- catch( Exception e2 ) {
- log.error( e2.getMessage(), e2 );
- }
- }
- if( monitor != null ) {
- monitor.afterExecute( task, worker, ex );
- }
-
- completedCount.incrementAndGet();
- if (log.isInfoEnabled()) {
- log.info("run() - done - completedCount: " + completedCount + ", thread: " + threadName +
- ", task: " + task.getId() + ", queue.size: " + queue.getSize() +
- ", pool.activeCount: " + pool.getActiveCount());
- }
+ if( monitor != null ) {
+ monitor.afterExecute( task, worker, ex );
+ }
+
+ completedCount.incrementAndGet();
+ if (log.isInfoEnabled()) {
+ log.info("run() - done - completedCount: " + completedCount + ", thread: " + threadName +
+ ", task: " + task.getId() + ", queue.size: " + queue.getSize() +
+ ", pool.activeCount: " + pool.getActiveCount());
+ }
- if( terminate ) {
- terminate();
- }
- else if (!queue.hasNext() && pool.getActiveCount() == 1) { //TODO it isn't a very good idea to check the activeCount inside the thread. an alternative way is to use the future to track the thread status and do the termination
- log.info("run() - no more queued task and active threads, set to terminate");
- terminate();
- }
+ if( terminate ) {
+ terminate();
+ }
+ } // loop
}
}
});
@@ -289,7 +334,7 @@
if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
log.info("SHUT DOWN NOW");
pool.shutdownNow(); // Cancel currently executing tasks
- // Wait a while for tasks to respond to being canceled
+ // Wait a while for to respond to being canceled
if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
log.info("Pool did not terminate");
}
@@ -345,4 +390,11 @@
this.monitor = monitor;
}
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
}