You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2012/08/18 06:47:11 UTC

svn commit: r1374513 - in /oodt/trunk/workflow/src: main/java/org/apache/oodt/cas/workflow/engine/ main/java/org/apache/oodt/cas/workflow/engine/runner/ test/org/apache/oodt/cas/workflow/engine/

Author: mattmann
Date: Sat Aug 18 04:47:10 2012
New Revision: 1374513

URL: http://svn.apache.org/viewvc?rev=1374513&view=rev
Log:
- OODT-310 WIP: start fleshing out the PQueueWorkflowEngine

Modified:
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java
    oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java?rev=1374513&r1=1374512&r2=1374513&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java Sat Aug 18 04:47:10 2012
@@ -42,26 +42,20 @@ import org.apache.oodt.cas.workflow.stru
 import org.apache.oodt.commons.util.DateConvert;
 
 /**
- *
+ * 
  * Describe your class here.
+ * 
  * @author mattmann
  * @author bfoster
  * @version $Revision$
- *
+ * 
  */
 public class PrioritizedQueueBasedWorkflowEngine implements WorkflowEngine {
 
-
-  private static final Logger LOG = Logger.getLogger(PrioritizedQueueBasedWorkflowEngine.class.getName());
-
-  //private Map<String, WorkflowPro> processorQueue;
-  private final List<WorkflowProcessor> runnableTasks;
-  private final Map<String, WorkflowProcessor> executingTasks;
-  //private WorkflowProcessorLock processorLock;
-  private List<String> metadataKeysToCache;
-  private boolean debugMode;
-  private final boolean allowQueuerToWork;
+  private static final Logger LOG = Logger
+      .getLogger(PrioritizedQueueBasedWorkflowEngine.class.getName());
   private final Thread queuerThread;
+  private final Thread runnerThread;
   private final WorkflowInstanceRepository repo;
   private final PrioritySorter prioritizer;
   private WorkflowProcessorQueue processorQueue;
@@ -69,49 +63,60 @@ public class PrioritizedQueueBasedWorkfl
   private final long conditionWait;
   private EngineRunner runner;
 
-  public PrioritizedQueueBasedWorkflowEngine(WorkflowInstanceRepository repo, PrioritySorter prioritizer, long conditionWait){
+  public PrioritizedQueueBasedWorkflowEngine(WorkflowInstanceRepository repo,
+      PrioritySorter prioritizer, long conditionWait) {
     this.repo = repo;
-    this.prioritizer = prioritizer != null ? new HighestFIFOPrioritySorter(1, 50, 1/*secondsBetweenBoosts, boostAmount, boostCap*/):
-      prioritizer;
+    this.prioritizer = prioritizer != null ? new HighestFIFOPrioritySorter(1,
+        50, 1) : prioritizer;
     this.wmgrUrl = null;
     this.conditionWait = conditionWait;
     this.processorQueue = new WorkflowProcessorQueue();
-    this.runnableTasks = new Vector<WorkflowProcessor>();
-    this.executingTasks = Collections.synchronizedMap(new HashMap<String, WorkflowProcessor>());
-    //this.processorLock = new WorkflowProcessorLock();
-    if (metadataKeysToCache != null)
-      this.metadataKeysToCache = new Vector<String>(metadataKeysToCache);
-//    this.debugMode = debugMode;
-    this.allowQueuerToWork = true;
-
-   /* try {
-      this.loadProcessorRepo();
-    }catch (Exception e) {
-      e.printStackTrace();
-    }*/
+
 
     // Task QUEUER thread
-    queuerThread = new Thread(new TaskQuerier(processorQueue, prioritizer));
+    TaskQuerier querier = new TaskQuerier(processorQueue, this.prioritizer);
+    queuerThread = new Thread(querier);
     queuerThread.start();
+
+    // Task Runner thread
+    runnerThread = new Thread(new TaskRunner(querier, runner));
+    runnerThread.start();
+
   }
 
   @Override
   public void setEngineRunner(EngineRunner runner) {
-     this.runner = runner;
+    this.runner = runner;
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#startWorkflow(org.apache.oodt.cas.workflow.structs.Workflow, org.apache.oodt.cas.metadata.Metadata)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#startWorkflow(org.apache
+   * .oodt.cas.workflow.structs.Workflow, org.apache.oodt.cas.metadata.Metadata)
    */
   @Override
   public WorkflowInstance startWorkflow(Workflow workflow, Metadata metadata)
       throws EngineException {
     // TODO Auto-generated method stub
+    
+    //looks like the work to do here is
+    // create a new WorkflowInstance
+    // create a new WorkflowProcessor around it
+    // set it in Queued status
+    // commit it to workflow instance repo and it will get picked up 
+    // by the runner thread
+    
     return null;
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#stopWorkflow(java.lang.String)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#stopWorkflow(java.lang
+   * .String)
    */
   @Override
   public void stopWorkflow(String workflowInstId) {
@@ -119,8 +124,12 @@ public class PrioritizedQueueBasedWorkfl
 
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#pauseWorkflowInstance(java.lang.String)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#pauseWorkflowInstance
+   * (java.lang.String)
    */
   @Override
   public void pauseWorkflowInstance(String workflowInstId) {
@@ -128,8 +137,12 @@ public class PrioritizedQueueBasedWorkfl
 
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#resumeWorkflowInstance(java.lang.String)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#resumeWorkflowInstance
+   * (java.lang.String)
    */
   @Override
   public void resumeWorkflowInstance(String workflowInstId) {
@@ -137,17 +150,23 @@ public class PrioritizedQueueBasedWorkfl
 
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#getInstanceRepository()
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#getInstanceRepository()
    */
   @Override
   public WorkflowInstanceRepository getInstanceRepository() {
-    // TODO Auto-generated method stub
-    return null;
+    return this.repo;
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#updateMetadata(java.lang.String, org.apache.oodt.cas.metadata.Metadata)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#updateMetadata(java.
+   * lang.String, org.apache.oodt.cas.metadata.Metadata)
    */
   @Override
   public boolean updateMetadata(String workflowInstId, Metadata met) {
@@ -155,8 +174,12 @@ public class PrioritizedQueueBasedWorkfl
     return false;
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#setWorkflowManagerUrl(java.net.URL)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#setWorkflowManagerUrl
+   * (java.net.URL)
    */
   @Override
   public void setWorkflowManagerUrl(URL url) {
@@ -164,8 +187,12 @@ public class PrioritizedQueueBasedWorkfl
 
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWallClockMinutes(java.lang.String)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWallClockMinutes(
+   * java.lang.String)
    */
   @Override
   public double getWallClockMinutes(String workflowInstId) {
@@ -173,8 +200,11 @@ public class PrioritizedQueueBasedWorkfl
     return 0;
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#getCurrentTaskWallClockMinutes(java.lang.String)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#
+   * getCurrentTaskWallClockMinutes(java.lang.String)
    */
   @Override
   public double getCurrentTaskWallClockMinutes(String workflowInstId) {
@@ -182,8 +212,12 @@ public class PrioritizedQueueBasedWorkfl
     return 0;
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWorkflowInstanceMetadata(java.lang.String)
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWorkflowInstanceMetadata
+   * (java.lang.String)
    */
   @Override
   public Metadata getWorkflowInstanceMetadata(String workflowInstId) {
@@ -191,8 +225,6 @@ public class PrioritizedQueueBasedWorkfl
     return null;
   }
 
+  // FIXME: add in methods from WEngine
 
-   //FIXME: add in methods from WEngine
-
-  }
-
+}

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java?rev=1374513&r1=1374512&r2=1374513&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java Sat Aug 18 04:47:10 2012
@@ -65,8 +65,7 @@ public class TaskRunner implements Runna
   private static final Logger LOG = Logger
       .getLogger(TaskRunner.class.getName());
 
-  public TaskRunner(TaskQuerier taskQuerier, EngineRunner runner,
-      int waitSeconds) {
+  public TaskRunner(TaskQuerier taskQuerier, EngineRunner runner) {
     this.running = true;
     this.taskQuerier = taskQuerier;
     this.runner = runner;
@@ -89,7 +88,6 @@ public class TaskRunner implements Runna
 
       try {
         if (nextTaskProcessor != null && runner.hasOpenSlots(nextTask)) {
-          // TODO: set Workflow met here?
           runner.execute(nextTask, nextTaskProcessor.getDynamicMetadata());
         }
       } catch (Exception e) {

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java?rev=1374513&r1=1374512&r2=1374513&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java Sat Aug 18 04:47:10 2012
@@ -21,10 +21,8 @@ package org.apache.oodt.cas.workflow.eng
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -47,7 +45,7 @@ public class AsynchronousLocalEngineRunn
 
   public static final int DEFAULT_NUM_THREADS = 25;
 
-  private final ThreadPoolExecutor executor;
+  private final ExecutorService executor;
   private final Map<String, Thread> workerMap;
 
   public AsynchronousLocalEngineRunner() {
@@ -55,17 +53,7 @@ public class AsynchronousLocalEngineRunn
   }
 
   public AsynchronousLocalEngineRunner(int numThreads) {
-    this.executor = new ThreadPoolExecutor(numThreads, numThreads, 30,
-        TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
-        new RejectedExecutionHandler() {
-
-          @Override
-          public void rejectedExecution(Runnable workflow,
-              ThreadPoolExecutor executor) {
-            // TODO Auto-generated method stub
-
-          }
-        });
+    this.executor = Executors.newFixedThreadPool(DEFAULT_NUM_THREADS);
     this.workerMap = new HashMap<String, Thread>();
   }
 
@@ -110,7 +98,7 @@ public class AsynchronousLocalEngineRunn
 
     };
 
-    String id = "";
+    String id = UUID.randomUUID().toString();
     synchronized (id) {
       id = UUID.randomUUID().toString();
       this.workerMap.put(id, worker);

Modified: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java?rev=1374513&r1=1374512&r2=1374513&view=diff
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java (original)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java Sat Aug 18 04:47:10 2012
@@ -63,7 +63,7 @@ public class TestTaskRunner extends Test
     assertNotNull(runnables);
     assertEquals(2, runnables.size());
     runner = new AsynchronousLocalEngineRunner();
-    taskRunner = new TaskRunner(querier, runner, 2);
+    taskRunner = new TaskRunner(querier, runner);
     assertNotNull(taskRunner);
     Thread runnerThread = new Thread(taskRunner);
     WorkflowTask task = taskRunner