You are viewing a plain text version of this content. The canonical link for it is here.
Posted to droids-commits@incubator.apache.org by ry...@apache.org on 2009/01/16 00:19:00 UTC
svn commit: r734867 - in
/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids:
api/PausableTaskMaster.java api/TaskMaster.java api/Worker.java
impl/MultiThreadedTaskMaster.java impl/SequentialTaskMaster.java
Author: ryan
Date: Thu Jan 15 16:18:59 2009
New Revision: 734867
URL: http://svn.apache.org/viewvc?rev=734867&view=rev
Log:
DROIDS-38 -- pausable task master
Added:
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java
Modified:
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/Worker.java
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/SequentialTaskMaster.java
Added: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java?rev=734867&view=auto
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java (added)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/PausableTaskMaster.java Thu Jan 15 16:18:59 2009
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.droids.api;
+
+
+/**
+ * A pausable task master interface
+ */
+public interface PausableTaskMaster<T extends Task> extends TaskMaster<T> {
+ void pause();
+ void resume();
+}
Modified: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java?rev=734867&r1=734866&r2=734867&view=diff
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java (original)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/TaskMaster.java Thu Jan 15 16:18:59 2009
@@ -16,7 +16,9 @@
*/
package org.apache.droids.api;
+import java.util.Collection;
import java.util.Date;
+import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -25,6 +27,12 @@
* Responsible for running all the tasks
*/
public interface TaskMaster<T extends Task> {
+ public enum ExecutionState {
+ INITALIZED,
+ RUNNING,
+ PAUSED,
+ COMPLETE
+ };
void processAllTasks( final TaskQueue<T> queue, final Droid<T> droid );
@@ -32,12 +40,11 @@
Date getFinishedWorking();
- boolean isWorking();
+ ExecutionState getExecutionState();
int getCompletedTasks();
T getLastCompletedTask();
void awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
-
}
Modified: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/Worker.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/Worker.java?rev=734867&r1=734866&r2=734867&view=diff
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/Worker.java (original)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/Worker.java Thu Jan 15 16:18:59 2009
@@ -17,6 +17,7 @@
package org.apache.droids.api;
import java.io.IOException;
+import java.io.Serializable;
import org.apache.droids.exception.DroidsException;
Modified: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java?rev=734867&r1=734866&r2=734867&view=diff
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java (original)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java Thu Jan 15 16:18:59 2009
@@ -17,27 +17,30 @@
package org.apache.droids.impl;
import java.util.Date;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.droids.api.DelayTimer;
import org.apache.droids.api.Droid;
+import org.apache.droids.api.PausableTaskMaster;
import org.apache.droids.api.Task;
import org.apache.droids.api.TaskExceptionHandler;
import org.apache.droids.api.TaskExceptionResult;
-import org.apache.droids.api.TaskMaster;
import org.apache.droids.api.TaskQueue;
import org.apache.droids.api.Worker;
import org.apache.droids.helper.Loggable;
public class MultiThreadedTaskMaster<T extends Task>
- extends Loggable implements TaskMaster<T>
+ extends Loggable implements PausableTaskMaster<T>
{
private static final long KEEP_ALIVE = 50000L;
- private ThreadPoolExecutor pool = null;
+ private PausableThreadPoolExecutor pool = null;
private ConcurrentHashMap<Long, WorkerRunner> runningWorker = null;
private int maxThreads = 0;
private TaskQueue<T> queue = null;
@@ -48,6 +51,7 @@
private Date finishedWorking = null;
private int completedTask = 0;
private T lastCompletedTask = null;
+ private ExecutionState state = ExecutionState.INITALIZED;
private TaskExceptionHandler exHandler;
@@ -60,13 +64,14 @@
this.droid = droid;
this.startedWorking = new Date();
this.finishedWorking = null;
+ this.state = ExecutionState.RUNNING;
int n = getMaxThreads();
if (log.isInfoEnabled()) {
log.info("Number of concurrent workers: " + n);
}
// start the pool
- this.pool = new ThreadPoolExecutor(n, n, KEEP_ALIVE,
+ this.pool = new PausableThreadPoolExecutor(n, n, KEEP_ALIVE,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() );
this.runningWorker = new ConcurrentHashMap<Long, WorkerRunner>();
@@ -75,51 +80,52 @@
}
- private void finishedWorker(long id)
- {
+ private void finishedWorker(WorkerRunner worker)
+ {
synchronized (this) {
- WorkerRunner worker = runningWorker.get(id);
- if (null != worker) {
- lastCompletedTask = worker.task;
-
- //int y = worker.getDepth() + 1;
- pool.remove(worker);
- runningWorker.remove(id);
- if (log.isDebugEnabled()) {
- log.debug("Worker '" + id + "' has finished.");
- }
+ long id = worker.getId();
+ if( runningWorker.remove(id) == null ) {
+ throw new RuntimeException("should remove something!");
+ }
+ lastCompletedTask = worker.task;
+
+ //int y = worker.getDepth() + 1;
+ pool.remove(worker);
+ if (log.isDebugEnabled()) {
+ log.debug("Worker '" + id + "' has finished.");
+ }
- boolean terminate = false;
-
- Exception ex = worker.getException();
- if (ex != null) {
- TaskExceptionResult result = TaskExceptionResult.WARN;
- if (exHandler != null) {
- result = exHandler.handleException(ex);
- }
- switch (result) {
- case WARN:
- log.warn(ex.toString());
- break;
- case FATAL:
- log.warn(ex.getMessage(), ex);
- terminate = true;
- break;
- }
+ boolean terminate = false;
+
+ Exception ex = worker.getException();
+ if (ex != null) {
+ TaskExceptionResult result = TaskExceptionResult.WARN;
+ if (exHandler != null) {
+ result = exHandler.handleException(ex);
+ }
+ switch (result) {
+ case WARN:
+ log.warn(ex.toString(), ex);
+ break;
+ case FATAL:
+ log.warn(ex.getMessage(), ex);
+ terminate = true;
+ break;
}
+ }
+
+ if (terminate || (runningWorker.size() == 0 & !queue.hasNext())) {
+ shutdownAndAwaitTermination();
- if (terminate || (runningWorker.size() == 0 & !queue.hasNext())) {
- shutdownAndAwaitTermination();
-
- long elapsed = System.currentTimeMillis() - startedWorking.getTime();
- if (log.isInfoEnabled()) {
- log.info("All threads have finished. (elapsed: " + elapsed + ")" );
- }
- finishedWorking = new Date();
- droid.finished();
- } else if (queue.hasNext()) {
- startWorkers();
+ long elapsed = System.currentTimeMillis() - startedWorking.getTime();
+ if (log.isInfoEnabled()) {
+ log.info("All threads have finished. (elapsed: " + elapsed + ")" );
}
+ finishedWorking = new Date();
+ state = ExecutionState.COMPLETE;
+ droid.finished();
+ } else if (queue.hasNext()) {
+ startWorkers();
}
completedTask++;
}
@@ -172,6 +178,13 @@
* @param maxThreads
*/
public void setMaxThreads(int maxThreads) {
+ if( pool != null && maxThreads != this.maxThreads ) {
+ pool.setCorePoolSize( maxThreads );
+
+ if( state == ExecutionState.RUNNING ) {
+ startWorkers(); // fill up the queue with new workers...
+ }
+ }
this.maxThreads = maxThreads;
}
@@ -182,10 +195,10 @@
public int getMaxThreads() {
return maxThreads;
}
-
- public boolean isWorking()
+
+ public ExecutionState getExecutionState()
{
- return startedWorking != null && finishedWorking == null;
+ return state;
}
/**
@@ -220,11 +233,16 @@
*/
class WorkerRunner extends Thread {
+ Date startTime;
T task;
+ Worker<T> worker;
Exception exception;
+ ExecutionState state;
@Override
public void run() {
+ startTime = new Date();
+ state = ExecutionState.RUNNING;
try {
task = queue.next();
if( task != null ) {
@@ -232,29 +250,32 @@
long delay = delayTimer.getDelayMillis();
if( delay > 0 ) {
try {
+ state = ExecutionState.PAUSED;
Thread.sleep( delay );
}
catch (InterruptedException e) {} // we don't really care....
}
+ state = ExecutionState.RUNNING;
}
- Worker<T> worker = droid.getNewWorker();
+ worker = droid.getNewWorker();
try {
worker.execute( task );
- } catch (Exception ex) {
+ }
+ catch (Exception ex) {
exception = ex;
}
}
}
finally {
- finishedWorker( getId() );
+ state = ExecutionState.COMPLETE;
+ finishedWorker( this );
}
}
public Exception getException() {
return exception;
}
-
}
public int getCompletedTasks() {
@@ -277,4 +298,55 @@
pool.awaitTermination(timeout, unit);
}
+
+ /**
+ * "pause" support
+ */
+ private class PausableThreadPoolExecutor extends ThreadPoolExecutor {
+ ReentrantLock pauseLock = new ReentrantLock();
+ Condition unpaused = pauseLock.newCondition();
+
+ public PausableThreadPoolExecutor(
+ int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue) {
+ super( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue );
+ }
+
+ protected void beforeExecute(Thread t, Runnable r) {
+ super.beforeExecute(t, r);
+ pauseLock.lock();
+ try {
+ while (state == ExecutionState.PAUSED) unpaused.await();
+ } catch(InterruptedException ie) {
+ t.interrupt();
+ } finally {
+ pauseLock.unlock();
+ }
+ }
+ }
+
+ public void pause() {
+ pool.pauseLock.lock();
+ try {
+ state = ExecutionState.PAUSED;
+ } finally {
+ pool.pauseLock.unlock();
+ }
+ }
+
+ public void resume() {
+ pool.pauseLock.lock();
+ try {
+ state = ExecutionState.RUNNING;
+ pool.unpaused.signalAll();
+ } finally {
+ pool.pauseLock.unlock();
+ }
+ }
+
}
+
+
Modified: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/SequentialTaskMaster.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/SequentialTaskMaster.java?rev=734867&r1=734866&r2=734867&view=diff
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/SequentialTaskMaster.java (original)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/SequentialTaskMaster.java Thu Jan 15 16:18:59 2009
@@ -39,6 +39,7 @@
private volatile Date finishedWorking = null;
private volatile int completedTask = 0;
private volatile T lastCompletedTask = null;
+ private volatile ExecutionState state = ExecutionState.INITALIZED;
private DelayTimer delayTimer = null;
private TaskExceptionHandler exHandler = null;
@@ -58,6 +59,7 @@
this.startedWorking = new Date();
this.finishedWorking = null;
this.completedTask = 0;
+ this.state = ExecutionState.RUNNING;
boolean terminated = false;
while( !terminated ) {
@@ -96,6 +98,7 @@
}
}
finishedWorking = new Date();
+ this.state = ExecutionState.COMPLETE;
droid.finished();
synchronized( mutex ) {
completed = true;
@@ -154,5 +157,10 @@
}
}
}
+ }
+
+ @Override
+ public ExecutionState getExecutionState() {
+ return state;
}
}