You are viewing a plain text version of this content. The canonical link for it is here.
Posted to droids-commits@incubator.apache.org by ry...@apache.org on 2009/02/09 05:08:32 UTC

svn commit: r742264 - in /incubator/droids/trunk/droids-core/src/main/java/org/apache/droids: api/WorkMonitor.java impl/MultiThreadedTaskMaster.java monitor/ monitor/SimpleWorkMonitor.java monitor/WorkBean.java

Author: ryan
Date: Mon Feb  9 05:08:31 2009
New Revision: 742264

URL: http://svn.apache.org/viewvc?rev=742264&view=rev
Log:
DROIDS-40 -- adding a work monitor to help keep track of what is hapening

Added:
    incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/WorkMonitor.java
    incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/
    incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/SimpleWorkMonitor.java
    incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/WorkBean.java
Modified:
    incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java

Added: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/WorkMonitor.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/WorkMonitor.java?rev=742264&view=auto
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/WorkMonitor.java (added)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/api/WorkMonitor.java Mon Feb  9 05:08:31 2009
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.droids.api;
+
+
+public interface WorkMonitor<T extends Task> {
+  void beforeExecute( final T task, final Worker<T> worker );
+  void afterExecute( final T task, final Worker<T> worker, Exception ex );
+}

Modified: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java?rev=742264&r1=742263&r2=742264&view=diff
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java (original)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/impl/MultiThreadedTaskMaster.java Mon Feb  9 05:08:31 2009
@@ -32,6 +32,7 @@
 import org.apache.droids.api.TaskExceptionHandler;
 import org.apache.droids.api.TaskExceptionResult;
 import org.apache.droids.api.TaskQueue;
+import org.apache.droids.api.WorkMonitor;
 import org.apache.droids.api.Worker;
 import org.apache.droids.helper.Loggable;
 
@@ -45,6 +46,7 @@
   private TaskQueue<T> queue = null;
   private Droid<T> droid = null;
   private DelayTimer delayTimer = null;
+  private WorkMonitor<T> monitor = null;
 
   private Date startedWorking = null;
   private Date finishedWorking = null;
@@ -63,6 +65,12 @@
     this.startedWorking = new Date();
     this.finishedWorking = null;
     this.state = ExecutionState.RUNNING;
+
+    if( !queue.hasNext() ) {
+      log.info( "no tasks. finishing now" );
+      terminate();
+      return;
+    }
     
     int n = getMaxThreads();
     if (log.isInfoEnabled()) {
@@ -105,22 +113,29 @@
         }
       }
       
-      if (terminate || (pool.getActiveCount() == 0 && !queue.hasNext()) ) {
-        shutdownAndAwaitTermination();
-        
-        long elapsed = System.currentTimeMillis() - startedWorking.getTime();
-        if (log.isInfoEnabled()) {
-          log.info("All threads have finished. (elapsed: " + elapsed + ")" );
-        }
-        finishedWorking = new Date();
-        state = ExecutionState.COMPLETE;
-        droid.finished();
-      } else if (queue.hasNext()) {
+      if (terminate) {
+        terminate();
+      } 
+      else {
         startWorkers();
       }
     }
   }
   
+  private void terminate() 
+  {
+    if( pool != null ) {
+      shutdownAndAwaitTermination();
+    }
+    
+    long elapsed = System.currentTimeMillis() - startedWorking.getTime();
+    if (log.isInfoEnabled()) {
+      log.info("All threads have finished. (elapsed: " + elapsed + ")" );
+    }
+    finishedWorking = new Date();
+    state = ExecutionState.COMPLETE;
+    droid.finished();
+  }
 
   /**
    * Will start a new worker.
@@ -128,28 +143,30 @@
    */
   private void startWorkers(){
     
+    int cnt = 0;
     try {
-      while( queue.hasNext() ) {
-        // The getActiveCount is often wrong for only a single thread!
-        // the javadocs say the number is approxiate, so in the case we only 
-        // see one active worker, we will assume there are none...
-        int activeWorkers = pool.getActiveCount();
-        if( activeWorkers == 1 ) { //&& runningWorker.isEmpty() ) {
-          activeWorkers = 0;
-        }
-        if( activeWorkers >= getMaxThreads() ) {
-          return;  // don't make a new runner...
-        }
-        WorkerRunner runner = new WorkerRunner();
-        if (log.isDebugEnabled()) {
-          log.debug("Starting worker '" + runner + "'");
-        }
-        pool.execute(runner);
+      while( queue.hasNext() && cnt++ < maxThreads ) {
+        // checking the "activeCount" can be expensive...
+//        // The getActiveCount is often wrong for only a single thread!
+//        // the javadocs say the number is approxiate, so in the case we only 
+//        // see one active worker, we will assume there are none...
+//        int activeWorkers = pool.getActiveCount();
+//        if( activeWorkers == 1 ) { //&& runningWorker.isEmpty() ) {
+//          activeWorkers = 0;
+//        }
+//        if( activeWorkers >= getMaxThreads() ) {
+//          return;  // don't make a new runner...
+//        }
+        pool.execute( new WorkerRunner() );
       }
     }
     catch( RejectedExecutionException ex ) {
       log.info( "humm", ex );
     }
+    
+    if( cnt == 0 && !queue.hasNext() ) {
+      terminate(); // nothing to do, we must be done...
+    }
   }
 
 
@@ -167,6 +184,14 @@
     this.delayTimer = delayTimer;
   }
   
+  public WorkMonitor<T> getMonitor() {
+    return monitor;
+  }
+
+  public void setMonitor(WorkMonitor<T> monitor) {
+    this.monitor = monitor;
+  }
+  
   /**
    * Adjust number of allowed threads
    * @param maxThreads
@@ -202,6 +227,9 @@
    */
   protected void shutdownAndAwaitTermination() {
     log.info("SHUTTING DOWN");
+    if( pool == null ) {
+      return;
+    }
     pool.shutdown(); // Disable new tasks from being submitted
     try {
       // Wait a while for existing tasks to terminate
@@ -246,10 +274,19 @@
 
           Worker<T> worker = droid.getNewWorker();
           try {
+            if( monitor != null ) {
+              monitor.beforeExecute(task, worker);
+            }
             worker.execute( task );
+            if( monitor != null ) {
+              monitor.afterExecute(task, worker, null);
+            }
           }
           catch (Exception ex) {
             exception = ex;
+            if( monitor != null ) {
+              monitor.afterExecute(task, worker, ex);
+            }
           }
         }
       }
@@ -311,6 +348,9 @@
   }
 
   public void pause() {
+    if( pool == null ) {
+      return;
+    }
     pool.pauseLock.lock();
     try {
       state = ExecutionState.PAUSED;
@@ -320,6 +360,9 @@
   }
 
   public void resume() {
+    if( pool == null ) {
+      return;
+    }
     pool.pauseLock.lock();
     try {
       state = ExecutionState.RUNNING;

Added: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/SimpleWorkMonitor.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/SimpleWorkMonitor.java?rev=742264&view=auto
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/SimpleWorkMonitor.java (added)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/SimpleWorkMonitor.java Mon Feb  9 05:08:31 2009
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.droids.monitor;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.droids.api.Task;
+import org.apache.droids.api.WorkMonitor;
+import org.apache.droids.api.Worker;
+
+/**
+ * A simple 
+ */
+public class SimpleWorkMonitor<T extends Task> implements WorkMonitor<T> {
+  
+  Map<T,WorkBean<T>> working = new ConcurrentHashMap<T, WorkBean<T>>();
+  
+  @Override
+  public void beforeExecute(T task, Worker<T> worker) {
+    WorkBean<T> bean = new WorkBean<T>( task, worker );
+    working.put( task, bean );
+  }
+
+  @Override
+  public void afterExecute(T task, Worker<T> worker, Exception ex) {
+    working.remove( task );
+  }
+  
+  public Collection<WorkBean<T>> getRunningTasks()
+  {
+    return working.values();
+  }
+  
+  public WorkBean<T> getWorkBean( T task )
+  {
+    return working.get( task );
+  }
+}

Added: incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/WorkBean.java
URL: http://svn.apache.org/viewvc/incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/WorkBean.java?rev=742264&view=auto
==============================================================================
--- incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/WorkBean.java (added)
+++ incubator/droids/trunk/droids-core/src/main/java/org/apache/droids/monitor/WorkBean.java Mon Feb  9 05:08:31 2009
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.droids.monitor;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.droids.api.*;
+import org.apache.droids.exception.InvalidTaskException;
+
+/**
+ * A simple 
+ */
+public class WorkBean<T extends Task> {
+
+  Date startTime;
+
+  T task;
+  Worker<T> worker;
+  Exception exception;
+  
+  public WorkBean( T task, Worker<T> w )
+  {
+    this.startTime = new Date();
+    this.task = task;
+    this.worker = w;
+  }
+
+  public Date getStartTime() {
+    return startTime;
+  }
+
+  public T getTask() {
+    return task;
+  }
+
+  public Worker<T> getWorker() {
+    return worker;
+  }
+
+  public Exception getException() {
+    return exception;
+  }
+}