You are viewing a plain text version of this content. The canonical link for it is here.
Posted to droids-commits@incubator.apache.org by th...@apache.org on 2009/09/02 09:53:29 UTC

svn commit: r810437 - /incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java

Author: thorsten
Date: Wed Sep  2 09:53:29 2009
New Revision: 810437

URL: http://svn.apache.org/viewvc?rev=810437&view=rev
Log:
DROIDS-51
review MultiThreadedTaskMaster thread creation/looping

due-to: Ryan McKinley

Modified:
    incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java

Modified: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java?rev=810437&r1=810436&r2=810437&view=diff
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java (original)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java Wed Sep  2 09:53:29 2009
@@ -22,8 +22,10 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
@@ -58,8 +60,9 @@
 public class MultiThreadedTaskMaster<T extends Task> implements TaskMaster<T> {
   protected final Log log = LogFactory.getLog(MultiThreadedTaskMaster.class);
   private static final long KEEP_ALIVE = 50000L;
-  private static final long TICKLE_TIME = 100L;
+  private static final long TICKLE_TIME = 250L;
 
+  protected String name = getClass().getName();
   protected ThreadPoolExecutor pool = null;
   protected int maxThreads = 0;
   protected TaskQueue<T> queue = null;
@@ -74,6 +77,35 @@
   protected AtomicLong completedCount = new AtomicLong();
   protected TaskExceptionHandler exHandler;
 
+  
+  class DroidsThreadFactory implements ThreadFactory {
+    final ThreadGroup group;
+    final AtomicInteger threadNumber = new AtomicInteger(1);
+    final String prefix;
+
+    DroidsThreadFactory( String descr ) {
+      prefix = (descr==null) ? name+"-" : name+"-"+descr+"-";
+      SecurityManager s = System.getSecurityManager();
+      group = (s != null)? s.getThreadGroup() :
+                           Thread.currentThread().getThreadGroup();
+    }
+
+    DroidsThreadFactory() {
+      this( null );
+    }
+
+    public Thread newThread(Runnable r) {
+      Thread t = new Thread(group, r,
+                            prefix + threadNumber.getAndIncrement(),
+                            0);
+      if (t.isDaemon())
+          t.setDaemon(false);
+      if (t.getPriority() != Thread.NORM_PRIORITY)
+          t.setPriority(Thread.NORM_PRIORITY);
+      return t;
+    }
+  }
+  
   /**
    * The queue has been initialized
    */
@@ -99,38 +131,50 @@
     // start the pool
     int poolSize = getMaxThreads();
     // user may set a thread pool before calling the processAllTasks
-    this.pool = this.pool != null ? this.pool : new ThreadPoolExecutor(poolSize, poolSize, KEEP_ALIVE, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(poolSize));
+    this.pool = this.pool != null ? this.pool : new ThreadPoolExecutor(poolSize, poolSize, KEEP_ALIVE, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(poolSize), new DroidsThreadFactory( "runner" ) );
     this.pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());//task is not yet run, there is no impact to discard fail to submit task
     if (log.isInfoEnabled()) {
       log.info("processAllTasks() - created ThreadPoolExecutor - poolSize: " + poolSize);
     }
     
     // process tasks in a new thread
-    ExecutorService executor = Executors.newSingleThreadExecutor();
+    ExecutorService executor = Executors.newSingleThreadExecutor( new DroidsThreadFactory( "main" ) );
     executor.execute( new Runnable() {
       @Override
       public void run() {
-        while (queue.hasNext() || pool.getActiveCount() > 0) {
-          if (log.isTraceEnabled()) {
-            log.trace("processAllTasks() - pool.activeCount: " + pool.getActiveCount() + ", pool.maximumPoolSize(): " + pool.getMaximumPoolSize());
+        while( state == ExecutionState.RUNNING && !pool.isTerminating() ) {
+          if( queue.hasNext() ) {
+            if( pool.getActiveCount() < pool.getMaximumPoolSize() ) {
+              submitNewTaskRunner(pool, queue);
+              continue; // skip the rest of the loop
+            }
+            else {
+              // pool is full but we have more to do
+              //log.info("More tasks to process, but pool is full");
+            }
+          }
+          else { 
+            // no new tasks to process
+            //log.info("No tasks to process....");
           }
           
-          if (queue.hasNext() && pool.getActiveCount() < pool.getMaximumPoolSize()) {
-            nextTask(pool, queue);
+          // Try sleeping for a little bit
+          try {
+            if (log.isDebugEnabled()) 
+              log.debug("processAllTasks() - no task or thread pool is full, to check again in " + TICKLE_TIME + "ms - queue.size: " + queue.getSize());
+            Thread.sleep(TICKLE_TIME);
           } 
-          else {
-            try {
-              if (log.isDebugEnabled()) 
-                log.debug("processAllTasks() - no task or thread pool is full, to check again in " + TICKLE_TIME + "ms - queue.size: " + queue.getSize());
-              Thread.sleep(TICKLE_TIME);
-            } 
-            catch (InterruptedException e) {
-              log.error(e);
-            }
+          catch (InterruptedException e) {
+            log.error(e);
+          }
+          
+          // Check if we are done
+          if( !queue.hasNext() && pool.getActiveCount() < 1 ) {
+            log.info("Finshed invocation, waiting for workers to finish.");
+            terminate();
+            break;
           }
         }
-        state = ExecutionState.COMPLETE;
-        log.info("Finshed invocation, waiting for workers to finish.");
       }
     });
   }
@@ -152,77 +196,78 @@
    * @param queue
    * @return
    */
-  protected Future nextTask(ExecutorService executor, final TaskQueue<? extends T> queue) {
+  protected Future submitNewTaskRunner(ExecutorService executor, final TaskQueue<? extends T> queue) {
     return executor.submit(new Runnable() {
       public void run() {
         String threadName = Thread.currentThread().getName();
-        T task = queue.next();
-        long delay = (delayTimer != null) ? delayTimer.getDelayMillis() : 0;
-        if (log.isDebugEnabled())
-          log.debug("run() - begin - thread: " + threadName + ", task: " + task.getId() + ", delay: " + delay);
         
-        Worker<T> worker = null;
-        Exception ex = null;
-        try {
-          if( delay > 0 ) {
-            Thread.sleep(delay); // gets the current thread
-          }
-          worker = droid.getNewWorker();
-          if( monitor != null ) {
-            monitor.beforeExecute( task, worker );
-          }
-          worker.execute(task);
-          lastCompletedTask = task;
-        } catch (DroidsException e) {
-          ex = e;
-        } catch (IOException e) {
-          ex = e;
-        } catch (InterruptedException e) {
-          ex = e;
-        } 
-        finally {
-          // Handle any exceptions
-          boolean terminate = false;
-          if (ex != null) {
-            try {
-              TaskExceptionResult result = TaskExceptionResult.WARN;  
-              if (exHandler != null) {
-                result = exHandler.handleException(ex); 
+        // use the same thread to keep processing
+        while( queue.hasNext() && pool.getActiveCount() <= maxThreads && !pool.isTerminating() ) {
+          T task = queue.next();
+          
+          long delay = (delayTimer != null) ? delayTimer.getDelayMillis() : 0;
+          if (log.isDebugEnabled())
+            log.debug("run() - begin - thread: " + threadName + ", task: " + task.getId() + ", delay: " + delay);
+          
+          Worker<T> worker = null;
+          Exception ex = null;
+          try {
+            if( delay > 0 ) {
+              Thread.sleep(delay); // gets the current thread
+            }
+            worker = droid.getNewWorker();
+            if( monitor != null ) {
+              monitor.beforeExecute( task, worker );
+            }
+            worker.execute(task);
+            lastCompletedTask = task;
+          } catch (DroidsException e) {
+            ex = e;
+          } catch (IOException e) {
+            ex = e;
+          } catch (InterruptedException e) {
+            ex = e;
+          } 
+          finally {
+            // Handle any exceptions
+            boolean terminate = false;
+            if (ex != null) {
+              try {
+                TaskExceptionResult result = TaskExceptionResult.WARN;  
+                if (exHandler != null) {
+                  result = exHandler.handleException(ex); 
+                }
+                switch (result) {
+                case WARN:
+                  log.warn(ex.toString(), ex);
+                  break;
+                case FATAL:
+                  log.warn(ex.getMessage(), ex);
+                  terminate = true;
+                  break;
+                case IGNORE: break; // nothing
+                }
               }
-              switch (result) {
-              case WARN:
-                log.warn(ex.toString(), ex);
-                break;
-              case FATAL:
-                log.warn(ex.getMessage(), ex);
-                terminate = true;
-                break;
-              case IGNORE: break; // nothing
+              catch( Exception e2 ) {
+                log.error( e2.getMessage(), e2 );
               }
             }
-            catch( Exception e2 ) {
-              log.error( e2.getMessage(), e2 );
-            }
-          }
 
-          if( monitor != null ) {
-            monitor.afterExecute( task, worker, ex );
-          }
-          
-          completedCount.incrementAndGet();
-          if (log.isInfoEnabled()) {
-            log.info("run() - done - completedCount: " + completedCount + ", thread: " + threadName +
-                ", task: " + task.getId() + ", queue.size: " + queue.getSize() +
-                ", pool.activeCount: " + pool.getActiveCount());
-          }
+            if( monitor != null ) {
+              monitor.afterExecute( task, worker, ex );
+            }
+            
+            completedCount.incrementAndGet();
+            if (log.isInfoEnabled()) {
+              log.info("run() - done - completedCount: " + completedCount + ", thread: " + threadName +
+                  ", task: " + task.getId() + ", queue.size: " + queue.getSize() +
+                  ", pool.activeCount: " + pool.getActiveCount());
+            }
 
-          if( terminate ) {
-            terminate();
-          }
-          else if (!queue.hasNext() && pool.getActiveCount() == 1) { //TODO it isn't a very good idea to check the activeCount inside the thread. an alternative way is to use the future to track the thread status and do the termination
-            log.info("run() - no more queued task and active threads, set to terminate");
-            terminate();
-          }
+            if( terminate ) {
+              terminate();
+            }
+          } // loop
         }
       }
     });
@@ -289,7 +334,7 @@
       if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
         log.info("SHUT DOWN NOW");
         pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being canceled
+        // Wait a while for  to respond to being canceled
         if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
           log.info("Pool did not terminate");
         }
@@ -345,4 +390,11 @@
     this.monitor = monitor;
   }
 
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
 }