You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2012/08/01 07:16:41 UTC

svn commit: r1367859 - in /oodt/trunk/workflow/src: main/java/org/apache/oodt/cas/workflow/engine/ test/org/apache/oodt/cas/workflow/engine/

Author: mattmann
Date: Wed Aug  1 05:16:41 2012
New Revision: 1367859

URL: http://svn.apache.org/viewvc?rev=1367859&view=rev
Log:
- OODT-310: WIP: Port WEngine to trunk

Added:
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
    oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java
    oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/QuerierAndRunnerUtils.java
Modified:
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java
    oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java
    oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java Wed Aug  1 05:16:41 2012
@@ -133,4 +133,13 @@ public class AsynchronousLocalEngineRunn
 
   }
 
+  /* (non-Javadoc)
+   * @see org.apache.oodt.cas.workflow.engine.EngineRunner#hasOpenSlots(org.apache.oodt.cas.workflow.structs.WorkflowTask)
+   */
+  @Override
+  public boolean hasOpenSlots(WorkflowTask workflowTask) throws Exception {
+    // TODO Auto-generated method stub
+    return true;
+  }
+
 }

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java Wed Aug  1 05:16:41 2012
@@ -57,5 +57,15 @@ public abstract class EngineRunner {
    * 
    */
   public abstract void shutdown() throws Exception;
+  
+  /**
+   * Decides whether or not there are available slots within this runner
+   * to execute the provided {@link WorkflowTask}.
+   * 
+   * @param workflowTask The {@link WorkflowTask} to execute.
+   * @return True if there is an open slot, false otherwise.
+   * @throws Exception If any error occurs.
+   */
+  public abstract boolean hasOpenSlots(WorkflowTask workflowTask) throws Exception;  
 
 }

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java Wed Aug  1 05:16:41 2012
@@ -71,7 +71,7 @@ public class PrioritizedQueueBasedWorkfl
       prioritizer;
     this.wmgrUrl = null;
     this.conditionWait = conditionWait;
-    //this.processorQueue = Collections.synchronizedMap(new HashMap<String, CachedWorkflowProcessor>());
+    this.processorQueue = new WorkflowProcessorQueue();
     this.runnableTasks = new Vector<WorkflowProcessor>();
     this.executingTasks = Collections.synchronizedMap(new HashMap<String, WorkflowProcessor>());
     //this.processorLock = new WorkflowProcessorLock();

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java Wed Aug  1 05:16:41 2012
@@ -107,6 +107,17 @@ public class ResourceRunner extends Engi
     // TODO Auto-generated method stub
 
   }
+  
+
+  /* (non-Javadoc)
+   * @see org.apache.oodt.cas.workflow.engine.EngineRunner#hasOpenSlots(org.apache.oodt.cas.workflow.structs.WorkflowTask)
+   */
+  @Override
+  public boolean hasOpenSlots(WorkflowTask workflowTask) throws Exception {
+    // TODO Auto-generated method stub
+    return false;
+  }
+  
 
   protected boolean safeCheckJobComplete(String jobId) {
     try {

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java Wed Aug  1 05:16:41 2012
@@ -77,4 +77,13 @@ public class SynchronousLocalEngineRunne
 
   }
 
+  /* (non-Javadoc)
+   * @see org.apache.oodt.cas.workflow.engine.EngineRunner#hasOpenSlots(org.apache.oodt.cas.workflow.structs.WorkflowTask)
+   */
+  @Override
+  public boolean hasOpenSlots(WorkflowTask workflowTask) throws Exception {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
 }

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java Wed Aug  1 05:16:41 2012
@@ -90,13 +90,13 @@ public class TaskQuerier implements Runn
               tp.setState(lifecycle.createState("Executing", "running",
                   "Added to Runnable queue"));
               System.out.println("Added processor with priority: ["+tp.getPriority()+"]");
-              processorsToRun.add(processor);
+              processorsToRun.add(tp);
             }
             
             prioritizer.sort(processorsToRun);
             
             synchronized(runnableProcessors){
-              runnableProcessors = processorsToRun;
+              if(running) runnableProcessors = processorsToRun;
             }
 
         } else {
@@ -127,6 +127,19 @@ public class TaskQuerier implements Runn
   public List<WorkflowProcessor> getRunnableProcessors() {
     return runnableProcessors;
   }
+  
+  /**
+   * Gets the next available {@link TaskProcessor} from the {@link List}
+   * of {@link #runnableProcessors}. Removes that {@link TaskProcessor}
+   * from the actual {@link #runnableProcessors} {@link List}.
+   * 
+   * @return The next available {@link TaskProcessor} from the {@link List}
+   * of {@link #runnableProcessors}.
+   */
+  public TaskProcessor getNext(){
+    if(getRunnableProcessors().size() == 0) return null;
+    return (TaskProcessor)getRunnableProcessors().remove(0);
+  }
 
   private WorkflowLifecycle getLifecycleForProcessor(WorkflowProcessor processor) {
     if (processor.getWorkflowInstance() != null

Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java?rev=1367859&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java Wed Aug  1 05:16:41 2012
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.structs.ParentChildWorkflow;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+
+/**
+ * 
+ * Implements the TaskRunner framework. Acts as a thread that works with the
+ * TaskQuerier to take the next sorted (aka ones that have been sorted with the
+ * Workflow PrioritySorter) task and then leverage the Engine's Runner to
+ * execute the task.
+ * 
+ * The TaskRunner thread first pops a task off the list using
+ * {@link TaskQuerier#getNext()} and then so long as the thread's
+ * {@link #runner} has open slots as returned by
+ * {@link EngineRunner#hasOpenSlots(WorkflowTask)}, and {@link #isPause()} is
+ * false and {@link #isRunning()} is true, then the task is handed off to the
+ * runner for execution.
+ * 
+ * The TaskRunner thread can be paused during which time it waits
+ * {@link #waitSeconds} seconds, wakes up to see if it's unpaused, and then goes
+ * back to sleep if not, otherwise, resumes executing if it was unpaused.
+ * 
+ * @since Apache OODT 0.5
+ * 
+ * @author mattmann
+ * @author bfoster
+ * @version $Revision$
+ * 
+ */
+public class TaskRunner implements Runnable {
+
+  private boolean running;
+
+  private boolean pause;
+
+  private TaskQuerier taskQuerier;
+
+  private EngineRunner runner;
+
+  private int waitSeconds;
+
+  private static final Logger LOG = Logger
+      .getLogger(TaskRunner.class.getName());
+
+  public TaskRunner(TaskQuerier taskQuerier, EngineRunner runner,
+      int waitSeconds) {
+    this.running = true;
+    this.pause = false;
+    this.taskQuerier = taskQuerier;
+    this.runner = runner;
+    this.waitSeconds = waitSeconds;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see java.lang.Runnable#run()
+   */
+  @Override
+  public void run() {
+    WorkflowTask nextTask = null;
+    TaskProcessor nextTaskProcessor = null;
+
+    while (running) {
+      try {
+        if (nextTaskProcessor == null){
+          nextTaskProcessor = taskQuerier.getNext();          
+          nextTask = extractTaskFromProcessor(nextTaskProcessor);
+        }
+        while (running && !pause && nextTask != null
+            && runner.hasOpenSlots(nextTask)) {
+          // TODO: set Workflow met here?
+          runner.execute(nextTask, nextTaskProcessor.getDynamicMetadata());
+          nextTaskProcessor = taskQuerier.getNext();
+          nextTask = extractTaskFromProcessor(nextTaskProcessor);
+
+          // take a breather
+          try{
+            Thread.currentThread().sleep(1000); //FIXME: make this configurable
+          }
+          catch (Exception ignore) {}
+        }
+      } catch (Exception e) {
+        LOG.log(
+            Level.SEVERE,
+            "Engine failed while submitting jobs to its runner : "
+                + e.getMessage(), e);
+        if (nextTask != null) {
+          nextTaskProcessor.setState(nextTaskProcessor
+              .getLifecycleManager()
+              .getDefaultLifecycle()
+              .createState("Failure", "done",
+                  "Failed while submitting job to Runner : " + e.getMessage()));
+          nextTask = null;
+          nextTaskProcessor = null;
+        }
+      }
+
+      try {
+        synchronized (this) {
+          do {
+            this.wait(waitSeconds * 1000);
+          } while (pause);
+        }
+      } catch (Exception ignore) {
+      }
+    }
+  }
+
+  /**
+   * @return the waitSeconds
+   */
+  public int getWaitSeconds() {
+    return waitSeconds;
+  }
+
+  /**
+   * @param waitSeconds
+   *          the waitSeconds to set
+   */
+  public void setWaitSeconds(int waitSeconds) {
+    this.waitSeconds = waitSeconds;
+  }
+
+  /**
+   * @return the running
+   */
+  public boolean isRunning() {
+    return running;
+  }
+
+  /**
+   * @param running
+   *          the running to set
+   */
+  public void setRunning(boolean running) {
+    this.running = running;
+  }
+
+  /**
+   * @return the pause
+   */
+  public boolean isPause() {
+    return pause;
+  }
+
+  /**
+   * @param pause
+   *          the pause to set
+   */
+  public void setPause(boolean pause) {
+    this.pause = pause;
+  }
+
+  protected WorkflowTask extractTaskFromProcessor(TaskProcessor taskProcessor) {
+    WorkflowInstance inst = taskProcessor.getWorkflowInstance();
+    ParentChildWorkflow workflow = inst.getParentChildWorkflow();
+    String taskId = inst.getCurrentTaskId();
+    for (WorkflowTask task : workflow.getTasks()) {
+      if (task.getTaskId().equals(taskId)) {
+        return task;
+      }
+    }
+
+    return null;
+  }
+
+}

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java Wed Aug  1 05:16:41 2012
@@ -84,6 +84,7 @@ public abstract class WorkflowProcessor 
     this.timesBlocked = 0;
     this.workflowInstance = new WorkflowInstance();
     this.lifecycleManager = lifecycleManager;
+    this.priority = Priority.getDefault();
   }
 
   /**

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java Wed Aug  1 05:16:41 2012
@@ -45,6 +45,9 @@ public class WorkflowProcessorBuilder {
 
   private WorkflowProcessorBuilder() {
     subProcessors = Lists.newArrayList();
+    this.id = null;
+    this.priority = -1;
+    this.lifecycleManager = null;
   }
 
   public static WorkflowProcessorBuilder aWorkflowProcessor() {
@@ -77,10 +80,10 @@ public class WorkflowProcessorBuilder {
   public WorkflowProcessor build(Class<? extends WorkflowProcessor> clazz)
       throws InstantiationException, IllegalAccessException {
     WorkflowProcessor wp = clazz.newInstance();
-    wp.getWorkflowInstance().setId(id);
-    wp.setLifecycleManager(lifecycleManager);
-    wp.setPriority(Priority.getPriority(priority));
-    wp.setSubProcessors(subProcessors);
+    if(this.id != null) wp.getWorkflowInstance().setId(id);
+    if(this.lifecycleManager != null) wp.setLifecycleManager(lifecycleManager);
+    if(this.priority != -1) wp.setPriority(Priority.getPriority(priority));
+    if(this.subProcessors != null) wp.setSubProcessors(subProcessors);
     return wp;
   }
 }
\ No newline at end of file

Added: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java?rev=1367859&view=auto
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java (added)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java Wed Aug  1 05:16:41 2012
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.util.List;
+import java.util.Vector;
+
+/**
+ *
+ * A mock {@link WorkflowProcessorQueue} object for use in testing.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public class MockProcessorQueue extends WorkflowProcessorQueue {
+  
+  private QuerierAndRunnerUtils utils;
+  
+  public MockProcessorQueue(){
+    this.utils = new QuerierAndRunnerUtils();
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.oodt.cas.workflow.engine.WorkflowProcessorQueue#getProcessors
+   * ()
+   */
+  @Override
+  public synchronized List<WorkflowProcessor> getProcessors() {
+    List<WorkflowProcessor> processors = new Vector<WorkflowProcessor>();
+    try {
+      processors.add(utils.getProcessor(10.0, "Success", "done"));
+      processors.add(utils.getProcessor(2.0, "Loaded", "initial"));
+      processors.add(utils.getProcessor(7.0, "Loaded", "initial"));
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+
+    return processors;
+  }
+
+}
\ No newline at end of file

Added: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/QuerierAndRunnerUtils.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/QuerierAndRunnerUtils.java?rev=1367859&view=auto
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/QuerierAndRunnerUtils.java (added)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/QuerierAndRunnerUtils.java Wed Aug  1 05:16:41 2012
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Vector;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleManager;
+import org.apache.oodt.cas.workflow.structs.Graph;
+import org.apache.oodt.cas.workflow.structs.ParentChildWorkflow;
+import org.apache.oodt.cas.workflow.structs.Priority;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+import org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration;
+
+/**
+ * 
+ * Utilities for testing the {@link TaskQuerier} and {@link TaskRunner} thread
+ * classes.
+ * 
+ * @author mattmann
+ * @version $Revision$
+ * 
+ */
+public class QuerierAndRunnerUtils {
+
+  private int dateGen;
+  
+  public QuerierAndRunnerUtils() {
+    this.dateGen = 0;
+  }
+  
+  public WorkflowTask getTask(File testDir){
+    WorkflowTask task = new WorkflowTask();
+    task.setConditions(Collections.emptyList());
+    task.setRequiredMetFields(Collections.emptyList());
+    task.setTaskId("urn:cas:workflow:tester");
+    task.setTaskInstanceClassName(SimpleTester.class.getName());
+    task.setTaskName("Tester");
+    WorkflowTaskConfiguration config = new WorkflowTaskConfiguration();
+    config.addConfigProperty("TestDirPath",
+        testDir.getAbsolutePath().endsWith("/") ? testDir.getAbsolutePath()
+            : testDir.getAbsolutePath() + "/");
+    task.setTaskConfig(config);
+    return task;
+  }
+
+  public WorkflowProcessor getProcessor(double priority, String stateName,
+      String categoryName) throws InstantiationException,
+      IllegalAccessException, IOException {
+    WorkflowLifecycleManager lifecycleManager = new WorkflowLifecycleManager(
+        "./src/main/resources/examples/wengine/wengine-lifecycle.xml");
+    WorkflowInstance inst = new WorkflowInstance();
+    Date sd = new Date();
+    sd.setTime(sd.getTime() + (this.dateGen * 5000));
+    this.dateGen++;
+    inst.setStartDate(sd);
+    inst.setId("winst-" + priority);
+    ParentChildWorkflow workflow = new ParentChildWorkflow(new Graph());
+    workflow.setTasks(Collections.EMPTY_LIST);
+    inst.setWorkflow(workflow);
+    inst.setPriority(Priority.getPriority(priority));
+    WorkflowProcessorBuilder builder = WorkflowProcessorBuilder
+        .aWorkflowProcessor().withLifecycleManager(lifecycleManager)
+        .withPriority(priority);
+    SequentialProcessor processor = (SequentialProcessor) builder
+        .build(SequentialProcessor.class);
+    processor.setWorkflowInstance(inst);
+    processor.setState(lifecycleManager.getDefaultLifecycle().createState(
+        stateName, categoryName, ""));
+    List<WorkflowProcessor> runnables = new Vector<WorkflowProcessor>();
+    TaskProcessor taskProcessor = (TaskProcessor) builder
+        .build(TaskProcessor.class);
+    taskProcessor.setState(lifecycleManager.getDefaultLifecycle().createState(
+        "Queued", "waiting", ""));
+    ParentChildWorkflow taskWorkflow = new ParentChildWorkflow(new Graph());    
+    taskWorkflow.getTasks().add(getTask(getTmpPath()));
+    WorkflowInstance taskWorkflowInst = new WorkflowInstance();
+    taskWorkflowInst.setWorkflow(taskWorkflow);
+    taskWorkflowInst.setCurrentTaskId(taskWorkflow.getTasks().get(0).getTaskId());
+    taskProcessor.setWorkflowInstance(taskWorkflowInst);
+    runnables.add(taskProcessor);
+    processor.setSubProcessors(runnables);
+    return processor;
+  }
+  
+  private File getTmpPath() throws IOException{
+    File testDir = null;
+    String parentPath = File.createTempFile("test", "txt").getParentFile().getAbsolutePath();
+    parentPath = parentPath.endsWith("/") ? parentPath:parentPath + "/";
+    String testJobDirPath = parentPath + "jobs";
+    testDir = new File(testJobDirPath);
+    testDir.mkdirs();
+    return testDir;
+  }
+
+}

Modified: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java (original)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java Wed Aug  1 05:16:41 2012
@@ -20,16 +20,15 @@ package org.apache.oodt.cas.workflow.eng
 //JDK imports
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
 import java.util.Calendar;
-import java.util.Collections;
 import java.util.Date;
 
+//APACHE imports
+import org.apache.commons.io.FileUtils;
+
 //OODT imports
 import org.apache.oodt.cas.metadata.Metadata;
 import org.apache.oodt.cas.workflow.structs.WorkflowTask;
-import org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration;
 import org.apache.oodt.commons.date.DateUtils;
 import org.apache.oodt.commons.util.DateConvert;
 
@@ -53,21 +52,14 @@ public class TestAsynchronousLocalEngine
   private AsynchronousLocalEngineRunner runner;
 
   protected File testDir;
+  
+  private QuerierAndRunnerUtils utils;
 
   public void testRun() {
-    WorkflowTask task = new WorkflowTask();
-    task.setConditions(Collections.emptyList());
-    task.setRequiredMetFields(Collections.emptyList());
-    task.setTaskId("urn:cas:workflow:tester");
-    task.setTaskInstanceClassName(SimpleTester.class.getName());
-    task.setTaskName("Tester");
-    WorkflowTaskConfiguration config = new WorkflowTaskConfiguration();
-    config.addConfigProperty("TestDirPath",
-        testDir.getAbsolutePath().endsWith("/") ? testDir.getAbsolutePath()
-            : testDir.getAbsolutePath() + "/");
-    task.setTaskConfig(config);
+    WorkflowTask task = utils.getTask(testDir);
     Metadata met = new Metadata();
-    met.addMetadata("StartDateTime", DateUtils.toString(Calendar.getInstance()));
+    met.addMetadata("StartDateTime", DateUtils.toString(Calendar.getInstance()));   
+
     try {
       runner.execute(task, met);
       runner.execute(task, met);
@@ -84,12 +76,7 @@ public class TestAsynchronousLocalEngine
     for (File f : this.testDir.listFiles()) {
       BufferedReader br = null;
       try {
-        br = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
-        String line = null;
-        while ((line = br.readLine()) != null) {
-
-        }
-
+        String line = FileUtils.readFileToString(f);
         String[] toks = line.split(",");
         Date dateTime = DateConvert.isoParse(toks[1]);
         Seconds seconds = Seconds.secondsBetween(new DateTime(dateTime),
@@ -130,6 +117,7 @@ public class TestAsynchronousLocalEngine
     testDir = new File(testJobDirPath);
     testDir.mkdirs();
     this.runner = new AsynchronousLocalEngineRunner();
+    this.utils = new QuerierAndRunnerUtils();
   }
 
   /*

Modified: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java (original)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java Wed Aug  1 05:16:41 2012
@@ -17,18 +17,8 @@
 
 package org.apache.oodt.cas.workflow.engine;
 
-//JDK imports
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Vector;
-
 //OODT imports
-import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleManager;
 import org.apache.oodt.cas.workflow.structs.FILOPrioritySorter;
-import org.apache.oodt.cas.workflow.structs.Priority;
-import org.apache.oodt.cas.workflow.structs.Workflow;
-import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
 
 //Junit imports
 import junit.framework.TestCase;
@@ -48,6 +38,26 @@ public class TestTaskQuerier extends Tes
   public TestTaskQuerier() {
     this.dateGen = 0;
   }
+  
+  public void testGetNext(){
+    FILOPrioritySorter prioritizer = new FILOPrioritySorter();
+    MockProcessorQueue processorQueue = new MockProcessorQueue();
+    assertNotNull(processorQueue.getProcessors());
+    assertEquals(3, processorQueue.getProcessors().size());
+    TaskQuerier querier = new TaskQuerier(processorQueue, prioritizer);
+    Thread querierThread = new Thread(querier);
+    querierThread.start();
+    while (querier.getRunnableProcessors().size() != 2) {
+      assertNotNull(querier.getRunnableProcessors());
+    }
+
+    querier.setRunning(false);
+    assertNotNull(querier.getRunnableProcessors());
+    assertEquals(2, querier.getRunnableProcessors().size());
+    TaskProcessor next = querier.getNext();
+    assertNotNull(next);
+    assertEquals(1, querier.getRunnableProcessors().size());
+  }
 
   public void testGetRunnableProcessors() {
     FILOPrioritySorter prioritizer = new FILOPrioritySorter();
@@ -66,10 +76,10 @@ public class TestTaskQuerier extends Tes
     assertEquals(2, querier.getRunnableProcessors().size());
     assertNotNull(querier.getRunnableProcessors().get(0));
     assertNotNull(querier.getRunnableProcessors().get(0).getPriority());
-    assertEquals(2.0, querier.getRunnableProcessors().get(0).getPriority()
-        .getValue());
-    assertEquals(7.0, querier.getRunnableProcessors().get(1).getPriority()
-        .getValue());
+    assertEquals(2.1, querier.getRunnableProcessors().get(0).getPriority()
+        .getValue()); // extra .1 since it's a task
+    assertEquals(7.1, querier.getRunnableProcessors().get(1).getPriority()
+        .getValue()); // extra .1 since it's a task
     try{
       querierThread.join();
     }
@@ -77,63 +87,4 @@ public class TestTaskQuerier extends Tes
 
   }
 
-  private WorkflowProcessor getProcessor(double priority, String stateName,
-      String categoryName) throws InstantiationException, IllegalAccessException {
-    WorkflowLifecycleManager lifecycleManager = new WorkflowLifecycleManager(
-        "./src/main/resources/examples/wengine/wengine-lifecycle.xml");
-    WorkflowInstance inst = new WorkflowInstance();
-    Date sd = new Date();
-    sd.setTime(sd.getTime() + (this.dateGen * 5000));
-    this.dateGen++;
-    inst.setStartDate(sd);
-    inst.setId("winst-" + priority);
-    Workflow workflow = new Workflow();
-    workflow.setTasks(Collections.EMPTY_LIST);
-    inst.setWorkflow(workflow);
-    inst.setPriority(Priority.getPriority(priority));
-    WorkflowProcessorBuilder builder = WorkflowProcessorBuilder.aWorkflowProcessor()
-    .withLifecycleManager(lifecycleManager)
-    .withPriority(priority);
-    SequentialProcessor processor = (SequentialProcessor)builder.build(SequentialProcessor.class);
-    processor.setWorkflowInstance(inst);
-    processor.setState(lifecycleManager.getDefaultLifecycle().createState(
-        stateName, categoryName, ""));
-    assertNotNull(processor.getState());
-    assertNotNull(processor.getState().getCategory());
-    assertNotNull(processor.getState().getCategory().getName());
-    List<WorkflowProcessor> runnables = new Vector<WorkflowProcessor>();
-    TaskProcessor taskProcessor = (TaskProcessor)builder.build(TaskProcessor.class);
-    taskProcessor.setState(lifecycleManager.getDefaultLifecycle().createState(
-        "Queued", "waiting", ""));
-    runnables.add(taskProcessor);
-    processor.setSubProcessors(runnables);    
-    return processor;
-  }
-
-  class MockProcessorQueue extends WorkflowProcessorQueue {
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.oodt.cas.workflow.engine.WorkflowProcessorQueue#getProcessors
-     * ()
-     */
-    @Override
-    public synchronized List<WorkflowProcessor> getProcessors() {
-      List<WorkflowProcessor> processors = new Vector<WorkflowProcessor>();
-      try {
-        processors.add(getProcessor(10.0, "Success", "done"));
-        processors.add(getProcessor(2.0, "Loaded", "initial"));
-        processors.add(getProcessor(7.0, "Loaded", "initial"));
-      } catch (Exception e) {
-        e.printStackTrace();
-        fail(e.getMessage());
-      }
-
-      return processors;
-    }
-
-  }
-
 }