You are viewing a plain text version of this content. The canonical link for it is here.
Posted to droids-commits@incubator.apache.org by ry...@apache.org on 2009/02/09 05:08:32 UTC
svn commit: r742264 - in
/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids:
api/WorkMonitor.java impl/MultiThreadedTaskMaster.java monitor/
monitor/SimpleWorkMonitor.java monitor/WorkBean.java
Author: ryan
Date: Mon Feb 9 05:08:31 2009
New Revision: 742264
URL: http://svn.apache.org/viewvc?rev=742264&view=rev
Log:
DROIDS-40 -- adding a work monitor to help keep track of what is hapening
Added:
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/WorkMonitor.java
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/SimpleWorkMonitor.java
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/WorkBean.java
Modified:
incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
Added: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/WorkMonitor.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/WorkMonitor.java?rev=742264&view=auto
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/WorkMonitor.java (added)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/WorkMonitor.java Mon Feb 9 05:08:31 2009
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.droids.api;
+
+
+public interface WorkMonitor<T extends Task> {
+ void beforeExecute( final T task, final Worker<T> worker );
+ void afterExecute( final T task, final Worker<T> worker, Exception ex );
+}
Modified: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java?rev=742264&r1=742263&r2=742264&view=diff
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java (original)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java Mon Feb 9 05:08:31 2009
@@ -32,6 +32,7 @@
import org.apache.droids.api.TaskExceptionHandler;
import org.apache.droids.api.TaskExceptionResult;
import org.apache.droids.api.TaskQueue;
+import org.apache.droids.api.WorkMonitor;
import org.apache.droids.api.Worker;
import org.apache.droids.helper.Loggable;
@@ -45,6 +46,7 @@
private TaskQueue<T> queue = null;
private Droid<T> droid = null;
private DelayTimer delayTimer = null;
+ private WorkMonitor<T> monitor = null;
private Date startedWorking = null;
private Date finishedWorking = null;
@@ -63,6 +65,12 @@
this.startedWorking = new Date();
this.finishedWorking = null;
this.state = ExecutionState.RUNNING;
+
+ if( !queue.hasNext() ) {
+ log.info( "no tasks. finishing now" );
+ terminate();
+ return;
+ }
int n = getMaxThreads();
if (log.isInfoEnabled()) {
@@ -105,22 +113,29 @@
}
}
- if (terminate || (pool.getActiveCount() == 0 && !queue.hasNext()) ) {
- shutdownAndAwaitTermination();
-
- long elapsed = System.currentTimeMillis() - startedWorking.getTime();
- if (log.isInfoEnabled()) {
- log.info("All threads have finished. (elapsed: " + elapsed + ")" );
- }
- finishedWorking = new Date();
- state = ExecutionState.COMPLETE;
- droid.finished();
- } else if (queue.hasNext()) {
+ if (terminate) {
+ terminate();
+ }
+ else {
startWorkers();
}
}
}
+ private void terminate()
+ {
+ if( pool != null ) {
+ shutdownAndAwaitTermination();
+ }
+
+ long elapsed = System.currentTimeMillis() - startedWorking.getTime();
+ if (log.isInfoEnabled()) {
+ log.info("All threads have finished. (elapsed: " + elapsed + ")" );
+ }
+ finishedWorking = new Date();
+ state = ExecutionState.COMPLETE;
+ droid.finished();
+ }
/**
* Will start a new worker.
@@ -128,28 +143,30 @@
*/
private void startWorkers(){
+ int cnt = 0;
try {
- while( queue.hasNext() ) {
- // The getActiveCount is often wrong for only a single thread!
- // the javadocs say the number is approxiate, so in the case we only
- // see one active worker, we will assume there are none...
- int activeWorkers = pool.getActiveCount();
- if( activeWorkers == 1 ) { //&& runningWorker.isEmpty() ) {
- activeWorkers = 0;
- }
- if( activeWorkers >= getMaxThreads() ) {
- return; // don't make a new runner...
- }
- WorkerRunner runner = new WorkerRunner();
- if (log.isDebugEnabled()) {
- log.debug("Starting worker '" + runner + "'");
- }
- pool.execute(runner);
+ while( queue.hasNext() && cnt++ < maxThreads ) {
+ // checking the "activeCount" can be expensive...
+// // The getActiveCount is often wrong for only a single thread!
+// // the javadocs say the number is approxiate, so in the case we only
+// // see one active worker, we will assume there are none...
+// int activeWorkers = pool.getActiveCount();
+// if( activeWorkers == 1 ) { //&& runningWorker.isEmpty() ) {
+// activeWorkers = 0;
+// }
+// if( activeWorkers >= getMaxThreads() ) {
+// return; // don't make a new runner...
+// }
+ pool.execute( new WorkerRunner() );
}
}
catch( RejectedExecutionException ex ) {
log.info( "humm", ex );
}
+
+ if( cnt == 0 && !queue.hasNext() ) {
+ terminate(); // nothing to do, we must be done...
+ }
}
@@ -167,6 +184,14 @@
this.delayTimer = delayTimer;
}
+ public WorkMonitor<T> getMonitor() {
+ return monitor;
+ }
+
+ public void setMonitor(WorkMonitor<T> monitor) {
+ this.monitor = monitor;
+ }
+
/**
* Adjust number of allowed threads
* @param maxThreads
@@ -202,6 +227,9 @@
*/
protected void shutdownAndAwaitTermination() {
log.info("SHUTTING DOWN");
+ if( pool == null ) {
+ return;
+ }
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
@@ -246,10 +274,19 @@
Worker<T> worker = droid.getNewWorker();
try {
+ if( monitor != null ) {
+ monitor.beforeExecute(task, worker);
+ }
worker.execute( task );
+ if( monitor != null ) {
+ monitor.afterExecute(task, worker, null);
+ }
}
catch (Exception ex) {
exception = ex;
+ if( monitor != null ) {
+ monitor.afterExecute(task, worker, ex);
+ }
}
}
}
@@ -311,6 +348,9 @@
}
public void pause() {
+ if( pool == null ) {
+ return;
+ }
pool.pauseLock.lock();
try {
state = ExecutionState.PAUSED;
@@ -320,6 +360,9 @@
}
public void resume() {
+ if( pool == null ) {
+ return;
+ }
pool.pauseLock.lock();
try {
state = ExecutionState.RUNNING;
Added: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/SimpleWorkMonitor.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/SimpleWorkMonitor.java?rev=742264&view=auto
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/SimpleWorkMonitor.java (added)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/SimpleWorkMonitor.java Mon Feb 9 05:08:31 2009
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.droids.monitor;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.droids.api.Task;
+import org.apache.droids.api.WorkMonitor;
+import org.apache.droids.api.Worker;
+
+/**
+ * A simple
+ */
+public class SimpleWorkMonitor<T extends Task> implements WorkMonitor<T> {
+
+ Map<T,WorkBean<T>> working = new ConcurrentHashMap<T, WorkBean<T>>();
+
+ @Override
+ public void beforeExecute(T task, Worker<T> worker) {
+ WorkBean<T> bean = new WorkBean<T>( task, worker );
+ working.put( task, bean );
+ }
+
+ @Override
+ public void afterExecute(T task, Worker<T> worker, Exception ex) {
+ working.remove( task );
+ }
+
+ public Collection<WorkBean<T>> getRunningTasks()
+ {
+ return working.values();
+ }
+
+ public WorkBean<T> getWorkBean( T task )
+ {
+ return working.get( task );
+ }
+}
Added: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/WorkBean.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/WorkBean.java?rev=742264&view=auto
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/WorkBean.java (added)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/WorkBean.java Mon Feb 9 05:08:31 2009
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.droids.monitor;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.droids.api.*;
+import org.apache.droids.exception.InvalidTaskException;
+
+/**
+ * A simple
+ */
+public class WorkBean<T extends Task> {
+
+ Date startTime;
+
+ T task;
+ Worker<T> worker;
+ Exception exception;
+
+ public WorkBean( T task, Worker<T> w )
+ {
+ this.startTime = new Date();
+ this.task = task;
+ this.worker = w;
+ }
+
+ public Date getStartTime() {
+ return startTime;
+ }
+
+ public T getTask() {
+ return task;
+ }
+
+ public Worker<T> getWorker() {
+ return worker;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+}