You are viewing a plain text version of this content. The canonical link for it is here.
Posted to droids-commits@incubator.apache.org by ry...@apache.org on 2009/01/16 00:19:00 UTC

svn commit: r734867 - in /incubator/droids/trunk/droids-core/src/main/java/org/apache/droids: api/PausableTaskMaster.java api/TaskMaster.java api/Worker.java impl/MultiThreadedTaskMaster.java impl/SequentialTaskMaster.java

Author: ryan
Date: Thu Jan 15 16:18:59 2009
New Revision: 734867

URL: http://svn.apache.org/viewvc?rev=734867&view=rev
Log:
DROIDS-38 -- pausable task master

Added:
    incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java
Modified:
    incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java
    incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/Worker.java
    incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
    incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/SequentialTaskMaster.java

Added: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java?rev=734867&view=auto
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java (added)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java Thu Jan 15 16:18:59 2009
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.droids.api;
+
+
+/**
+ * A pausable task master interface
+ */
+public interface PausableTaskMaster<T extends Task> extends TaskMaster<T> {
+  void pause();
+  void resume();
+}

Modified: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java?rev=734867&r1=734866&r2=734867&view=diff
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java (original)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java Thu Jan 15 16:18:59 2009
@@ -16,7 +16,9 @@
  */
 package org.apache.droids.api;
 
+import java.util.Collection;
 import java.util.Date;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 
@@ -25,6 +27,12 @@
  * Responsible for running all the tasks
  */
 public interface TaskMaster<T extends Task> {
+  public enum ExecutionState {
+    INITALIZED,
+    RUNNING,
+    PAUSED,
+    COMPLETE
+  };
  
   void processAllTasks( final TaskQueue<T> queue, final Droid<T> droid );
 
@@ -32,12 +40,11 @@
 
   Date getFinishedWorking();
 
-  boolean isWorking();
+  ExecutionState getExecutionState();
 
   int getCompletedTasks();
 
   T getLastCompletedTask();
   
   void awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
-  
 }

Modified: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/Worker.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/Worker.java?rev=734867&r1=734866&r2=734867&view=diff
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/Worker.java (original)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/Worker.java Thu Jan 15 16:18:59 2009
@@ -17,6 +17,7 @@
 package org.apache.droids.api;
 
 import java.io.IOException;
+import java.io.Serializable;
 
 import org.apache.droids.exception.DroidsException;
 

Modified: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java?rev=734867&r1=734866&r2=734867&view=diff
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java (original)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java Thu Jan 15 16:18:59 2009
@@ -17,27 +17,30 @@
 package org.apache.droids.impl;
 
 import java.util.Date;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.droids.api.DelayTimer;
 import org.apache.droids.api.Droid;
+import org.apache.droids.api.PausableTaskMaster;
 import org.apache.droids.api.Task;
 import org.apache.droids.api.TaskExceptionHandler;
 import org.apache.droids.api.TaskExceptionResult;
-import org.apache.droids.api.TaskMaster;
 import org.apache.droids.api.TaskQueue;
 import org.apache.droids.api.Worker;
 import org.apache.droids.helper.Loggable;
 
 public class MultiThreadedTaskMaster<T extends Task> 
-                                      extends Loggable implements TaskMaster<T> 
+     extends Loggable implements PausableTaskMaster<T>
 {
   private static final long KEEP_ALIVE = 50000L;
   
-  private ThreadPoolExecutor pool = null;
+  private PausableThreadPoolExecutor pool = null;
   private ConcurrentHashMap<Long, WorkerRunner> runningWorker = null;
   private int maxThreads = 0;
   private TaskQueue<T> queue = null;
@@ -48,6 +51,7 @@
   private Date finishedWorking = null;
   private int completedTask = 0;
   private T lastCompletedTask = null;
+  private ExecutionState state = ExecutionState.INITALIZED;
 
   private TaskExceptionHandler exHandler;
   
@@ -60,13 +64,14 @@
     this.droid = droid;
     this.startedWorking = new Date();
     this.finishedWorking = null;
+    this.state = ExecutionState.RUNNING;
     
     int n = getMaxThreads();
     if (log.isInfoEnabled()) {
       log.info("Number of concurrent workers: " + n);
     }
     // start the pool
-    this.pool = new ThreadPoolExecutor(n, n, KEEP_ALIVE,
+    this.pool = new PausableThreadPoolExecutor(n, n, KEEP_ALIVE,
         TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() );
     this.runningWorker = new ConcurrentHashMap<Long, WorkerRunner>();
     
@@ -75,51 +80,52 @@
   }
   
   
-  private void finishedWorker(long id) 
-  {
+  private void finishedWorker(WorkerRunner worker) 
+  { 
     synchronized (this) {
-      WorkerRunner worker = runningWorker.get(id);
-      if (null != worker) {
-        lastCompletedTask = worker.task;
-        
-        //int y = worker.getDepth() + 1;
-        pool.remove(worker);
-        runningWorker.remove(id);
-        if (log.isDebugEnabled()) {
-          log.debug("Worker '" + id + "' has finished.");
-        }
+      long id = worker.getId();
+      if( runningWorker.remove(id) == null ) {
+        throw new RuntimeException("should remove something!");
+      }
+      lastCompletedTask = worker.task;
+      
+      //int y = worker.getDepth() + 1;
+      pool.remove(worker);
+      if (log.isDebugEnabled()) {
+        log.debug("Worker '" + id + "' has finished.");
+      }
 
-        boolean terminate = false;
-        
-        Exception ex = worker.getException();
-        if (ex != null) {
-          TaskExceptionResult result = TaskExceptionResult.WARN;  
-          if (exHandler != null) {
-            result = exHandler.handleException(ex); 
-          }
-          switch (result) {
-          case WARN:
-            log.warn(ex.toString());
-            break;
-          case FATAL:
-            log.warn(ex.getMessage(), ex);
-            terminate = true;
-            break;
-          }
+      boolean terminate = false;
+      
+      Exception ex = worker.getException();
+      if (ex != null) {
+        TaskExceptionResult result = TaskExceptionResult.WARN;  
+        if (exHandler != null) {
+          result = exHandler.handleException(ex); 
+        }
+        switch (result) {
+        case WARN:
+          log.warn(ex.toString(), ex);
+          break;
+        case FATAL:
+          log.warn(ex.getMessage(), ex);
+          terminate = true;
+          break;
         }
+      }
+      
+      if (terminate || (runningWorker.size() == 0 & !queue.hasNext())) {
+        shutdownAndAwaitTermination();
         
-        if (terminate || (runningWorker.size() == 0 & !queue.hasNext())) {
-          shutdownAndAwaitTermination();
-          
-          long elapsed = System.currentTimeMillis() - startedWorking.getTime();
-          if (log.isInfoEnabled()) {
-            log.info("All threads have finished. (elapsed: " + elapsed + ")" );
-          }
-          finishedWorking = new Date();
-          droid.finished();
-        } else if (queue.hasNext()) {
-          startWorkers();
+        long elapsed = System.currentTimeMillis() - startedWorking.getTime();
+        if (log.isInfoEnabled()) {
+          log.info("All threads have finished. (elapsed: " + elapsed + ")" );
         }
+        finishedWorking = new Date();
+        state = ExecutionState.COMPLETE;
+        droid.finished();
+      } else if (queue.hasNext()) {
+        startWorkers();
       }
       completedTask++;
     }
@@ -172,6 +178,13 @@
    * @param maxThreads
    */
   public  void setMaxThreads(int maxThreads) {
+    if( pool != null && maxThreads != this.maxThreads ) {
+      pool.setCorePoolSize( maxThreads );
+      
+      if( state == ExecutionState.RUNNING ) {
+        startWorkers(); // fill up the queue with new workers...
+      }
+    }
     this.maxThreads = maxThreads;
   }
 
@@ -182,10 +195,10 @@
   public int getMaxThreads() {
     return maxThreads;
   }
-  
-  public boolean isWorking()
+
+  public ExecutionState getExecutionState()
   {
-    return startedWorking != null && finishedWorking == null;
+    return state;
   }
 
   /**
@@ -220,11 +233,16 @@
    */
   class WorkerRunner extends Thread {
     
+    Date startTime;
     T task;
+    Worker<T> worker;
     Exception exception;
+    ExecutionState state;
     
     @Override
     public void run() {
+      startTime = new Date();
+      state = ExecutionState.RUNNING;
       try {
         task = queue.next();
         if( task != null ) {
@@ -232,29 +250,32 @@
             long delay = delayTimer.getDelayMillis();
             if( delay > 0 ) {
               try {
+                state = ExecutionState.PAUSED;
                 Thread.sleep( delay );
               } 
               catch (InterruptedException e) {} // we don't really care....
             }
+            state = ExecutionState.RUNNING;
           }
 
-          Worker<T> worker = droid.getNewWorker();
+          worker = droid.getNewWorker();
           try {
             worker.execute( task );
-          } catch (Exception ex) {
+          } 
+          catch (Exception ex) {
             exception = ex;
           }
         }
       }
       finally {
-        finishedWorker( getId() );
+        state = ExecutionState.COMPLETE;
+        finishedWorker( this );
       }
     }
     
     public Exception getException() {
       return exception;
     }
-    
   }
 
   public int getCompletedTasks() {
@@ -277,4 +298,55 @@
     pool.awaitTermination(timeout, unit);
   }
   
+
+  /**
+   * "pause" support
+   */
+  private class PausableThreadPoolExecutor extends ThreadPoolExecutor {
+    ReentrantLock pauseLock = new ReentrantLock();
+    Condition unpaused = pauseLock.newCondition();
+
+    public PausableThreadPoolExecutor(
+          int corePoolSize,
+          int maximumPoolSize,
+          long keepAliveTime,
+          TimeUnit unit,
+          BlockingQueue<Runnable> workQueue) {
+      super( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue );
+    }
+
+    protected void beforeExecute(Thread t, Runnable r) {
+      super.beforeExecute(t, r);
+      pauseLock.lock();
+      try {
+        while (state == ExecutionState.PAUSED) unpaused.await();
+      } catch(InterruptedException ie) {
+        t.interrupt();
+      } finally {
+        pauseLock.unlock();
+      }
+    }
+  }
+
+  public void pause() {
+    pool.pauseLock.lock();
+    try {
+      state = ExecutionState.PAUSED;
+    } finally {
+      pool.pauseLock.unlock();
+    }
+  }
+
+  public void resume() {
+    pool.pauseLock.lock();
+    try {
+      state = ExecutionState.RUNNING;
+      pool.unpaused.signalAll();
+    } finally {
+      pool.pauseLock.unlock();
+    }
+  }
+
 }
+
+

Modified: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/SequentialTaskMaster.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/SequentialTaskMaster.java?rev=734867&r1=734866&r2=734867&view=diff
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/SequentialTaskMaster.java (original)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/SequentialTaskMaster.java Thu Jan 15 16:18:59 2009
@@ -39,6 +39,7 @@
   private volatile Date finishedWorking = null;
   private volatile int completedTask = 0;
   private volatile T lastCompletedTask = null;
+  private volatile ExecutionState state = ExecutionState.INITALIZED;
   
   private DelayTimer delayTimer = null;
   private TaskExceptionHandler exHandler = null;
@@ -58,6 +59,7 @@
     this.startedWorking = new Date();
     this.finishedWorking = null;
     this.completedTask = 0;
+    this.state = ExecutionState.RUNNING;
     
     boolean terminated = false;
     while( !terminated ) {
@@ -96,6 +98,7 @@
       }
     }
     finishedWorking = new Date();
+    this.state = ExecutionState.COMPLETE;
     droid.finished();
     synchronized( mutex ) {
       completed = true;
@@ -154,5 +157,10 @@
         }
       }
     }
+  }
+
+  @Override
+  public ExecutionState getExecutionState() {
+    return state;
   }  
 }