You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2012/08/18 05:16:21 UTC

svn commit: r1374506 - in /oodt/trunk/workflow/src: main/java/org/apache/oodt/cas/workflow/engine/ test/org/apache/oodt/cas/workflow/engine/

Author: mattmann
Date: Sat Aug 18 03:16:21 2012
New Revision: 1374506

URL: http://svn.apache.org/viewvc?rev=1374506&view=rev
Log:
- OODT-310 WIP: make the unit tests more predictable
   - MockProcessorQueue returns 3 processors at first, then returns 0 (they have been consumed)
   - TestTaskQuerier and TestTaskRunner now set temp variables to store queue results and then test them

Modified:
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
    oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
    oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java
    oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java
    oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java?rev=1374506&r1=1374505&r2=1374506&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java Sat Aug 18 03:16:21 2012
@@ -20,6 +20,8 @@ package org.apache.oodt.cas.workflow.eng
 //JDK imports
 import java.util.List;
 import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 //OODT imports
 import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
@@ -49,6 +51,8 @@ public class TaskQuerier implements Runn
   private List<WorkflowProcessor> runnableProcessors;
 
   private PrioritySorter prioritizer;
+  
+  private static final Logger LOG = Logger.getLogger(TaskQuerier.class.getName());
 
   /**
    * Constructs a new TaskQuerier with the given {@link WorkflowProcessorQueue},
@@ -92,7 +96,7 @@ public class TaskQuerier implements Runn
             for (TaskProcessor tp : processor.getRunnableWorkflowProcessors()) {
               tp.setState(lifecycle.createState("Executing", "running",
                   "Added to Runnable queue"));
-              System.out.println("Added processor with priority: ["+tp.getPriority()+"]");
+              LOG.log(Level.INFO, "Added processor with priority: ["+tp.getPriority()+"]");
               processorsToRun.add(tp);
             }
             

Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java?rev=1374506&r1=1374505&r2=1374506&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java Sat Aug 18 03:16:21 2012
@@ -29,58 +29,52 @@ import org.apache.oodt.cas.workflow.stru
 import org.apache.oodt.cas.workflow.structs.WorkflowTask;
 
 /**
- *
+ * 
  * Implements the TaskRunner framework. Acts as a thread that works with the
  * TaskQuerier to take the next sorted (aka ones that have been sorted with the
  * Workflow PrioritySorter) task and then leverage the Engine's Runner to
  * execute the task.
- *
+ * 
  * The TaskRunner thread first pops a task off the list using
  * {@link TaskQuerier#getNext()} and then so long as the thread's
  * {@link #runner} has open slots as returned by
  * {@link EngineRunner#hasOpenSlots(WorkflowTask)}, and {@link #isPause()} is
  * false and {@link #isRunning()} is true, then the task is handed off to the
  * runner for execution.
- *
+ * 
  * The TaskRunner thread can be paused during which time it waits
  * {@link #waitSeconds} seconds, wakes up to see if it's unpaused, and then goes
  * back to sleep if not, otherwise, resumes executing if it was unpaused.
- *
+ * 
  * @since Apache OODT 0.5
- *
+ * 
  * @author mattmann
  * @author bfoster
  * @version $Revision$
- *
+ * 
  */
-//TODO(bfoster): Rename... Runner is missleading.
+// TODO(bfoster): Rename... Runner is missleading.
 public class TaskRunner implements Runnable {
 
   private boolean running;
 
-  private boolean pause;
-
   private final TaskQuerier taskQuerier;
 
   private final EngineRunner runner;
 
-  private int waitSeconds;
-
   private static final Logger LOG = Logger
       .getLogger(TaskRunner.class.getName());
 
   public TaskRunner(TaskQuerier taskQuerier, EngineRunner runner,
       int waitSeconds) {
     this.running = true;
-    this.pause = false;
     this.taskQuerier = taskQuerier;
     this.runner = runner;
-    this.waitSeconds = waitSeconds;
   }
 
   /*
    * (non-Javadoc)
-   *
+   * 
    * @see java.lang.Runnable#run()
    */
   @Override
@@ -89,66 +83,29 @@ public class TaskRunner implements Runna
     TaskProcessor nextTaskProcessor = null;
 
     while (running) {
-      try {          
-        nextTaskProcessor = taskQuerier.getNext();
-        nextTask = nextTaskProcessor != null ? 
-            extractTaskFromProcessor(nextTaskProcessor):null;        
-        
-        while (running && !pause && nextTask != null 
-            && runner.hasOpenSlots(nextTask)) {
-              
-          // TODO: set Workflow met here?          
+      nextTaskProcessor = taskQuerier.getNext();
+      nextTask = nextTaskProcessor != null ? extractTaskFromProcessor(nextTaskProcessor)
+          : null;
+
+      try {
+        if (nextTaskProcessor != null && runner.hasOpenSlots(nextTask)) {
+          // TODO: set Workflow met here?
           runner.execute(nextTask, nextTaskProcessor.getDynamicMetadata());
-          nextTaskProcessor = taskQuerier.getNext();
-          nextTask = nextTaskProcessor != null ? 
-              extractTaskFromProcessor(nextTaskProcessor):null;
         }
-      } 
-       catch(InterruptedException e){
-         this.running = false;
-         break;
-       }
-      catch (Exception e) {
+      } catch (Exception e) {
+        e.printStackTrace();
         LOG.log(
             Level.SEVERE,
             "Engine failed while submitting jobs to its runner : "
                 + e.getMessage(), e);
         if (nextTask != null) {
-          nextTaskProcessor.setState(nextTaskProcessor
-              .getLifecycleManager()
-              .getDefaultLifecycle()
-              .createState("Failure", "done",
-                  "Failed while submitting job to Runner : " + e.getMessage()));
+          this.flagProcessorAsFailed(nextTaskProcessor, e.getMessage());
           nextTask = null;
           nextTaskProcessor = null;
         }
       }
-
-      try {
-        synchronized (this) {
-          do {
-            this.wait(waitSeconds * 1000);
-          } while (pause);
-        }
-      } catch (Exception ignore) {
-      }
     }
-    
-  }
-
-  /**
-   * @return the waitSeconds
-   */
-  public int getWaitSeconds() {
-    return waitSeconds;
-  }
 
-  /**
-   * @param waitSeconds
-   *          the waitSeconds to set
-   */
-  public void setWaitSeconds(int waitSeconds) {
-    this.waitSeconds = waitSeconds;
   }
 
   /**
@@ -166,21 +123,6 @@ public class TaskRunner implements Runna
     this.running = running;
   }
 
-  /**
-   * @return the pause
-   */
-  public boolean isPause() {
-    return pause;
-  }
-
-  /**
-   * @param pause
-   *          the pause to set
-   */
-  public void setPause(boolean pause) {
-    this.pause = pause;
-  }
-
   protected WorkflowTask extractTaskFromProcessor(TaskProcessor taskProcessor) {
     WorkflowInstance inst = taskProcessor.getWorkflowInstance();
     ParentChildWorkflow workflow = inst.getParentChildWorkflow();
@@ -194,4 +136,13 @@ public class TaskRunner implements Runna
     return null;
   }
 
+  private void flagProcessorAsFailed(TaskProcessor nextTaskProcessor, String msg) {
+    nextTaskProcessor.setState(nextTaskProcessor
+        .getLifecycleManager()
+        .getDefaultLifecycle()
+        .createState("Failure", "done",
+            "Failed while submitting job to Runner : " + msg));
+
+  }
+
 }

Modified: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java?rev=1374506&r1=1374505&r2=1374506&view=diff
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java (original)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java Sat Aug 18 03:16:21 2012
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.oodt.cas.workflow.engine;
 
 //JDK imports
@@ -26,35 +25,40 @@ import org.apache.oodt.cas.workflow.engi
 import org.apache.oodt.cas.workflow.engine.processor.WorkflowProcessorQueue;
 
 /**
- *
+ * 
  * A mock {@link WorkflowProcessorQueue} object for use in testing.
- *
+ * 
  * @author mattmann
  * @version $Revision$
- *
+ * 
  */
 public class MockProcessorQueue extends WorkflowProcessorQueue {
-  
+
   private QuerierAndRunnerUtils utils;
-  
-  public MockProcessorQueue(){
+
+  private boolean consumed;
+
+  public MockProcessorQueue() {
     this.utils = new QuerierAndRunnerUtils();
+    this.consumed = false;
   }
 
   /*
    * (non-Javadoc)
    * 
    * @see
-   * org.apache.oodt.cas.workflow.engine.WorkflowProcessorQueue#getProcessors
-   * ()
+   * org.apache.oodt.cas.workflow.engine.WorkflowProcessorQueue#getProcessors ()
    */
   @Override
   public synchronized List<WorkflowProcessor> getProcessors() {
     List<WorkflowProcessor> processors = new Vector<WorkflowProcessor>();
     try {
-      processors.add(utils.getProcessor(10.0, "Success", "done"));
-      processors.add(utils.getProcessor(2.0, "Loaded", "initial"));
-      processors.add(utils.getProcessor(7.0, "Loaded", "initial"));
+      if (!consumed) {
+        processors.add(utils.getProcessor(10.0, "Success", "done"));
+        processors.add(utils.getProcessor(2.0, "Loaded", "initial"));
+        processors.add(utils.getProcessor(7.0, "Loaded", "initial"));
+        this.consumed = true;
+      }
     } catch (Exception e) {
       e.printStackTrace();
       throw new RuntimeException(e);

Modified: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java?rev=1374506&r1=1374505&r2=1374506&view=diff
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java (original)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java Sat Aug 18 03:16:21 2012
@@ -18,7 +18,10 @@
 package org.apache.oodt.cas.workflow.engine;
 
 //OODT imports
+import java.util.List;
+
 import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
+import org.apache.oodt.cas.workflow.engine.processor.WorkflowProcessor;
 import org.apache.oodt.cas.workflow.structs.FILOPrioritySorter;
 
 //Junit imports
@@ -43,18 +46,22 @@ public class TestTaskQuerier extends Tes
   public void testGetNext(){
     FILOPrioritySorter prioritizer = new FILOPrioritySorter();
     MockProcessorQueue processorQueue = new MockProcessorQueue();
-    assertNotNull(processorQueue.getProcessors());
-    assertEquals(3, processorQueue.getProcessors().size());
+    List<WorkflowProcessor> queued = null;
+    assertNotNull(queued = processorQueue.getProcessors());
+    assertEquals(3, queued.size());
+    processorQueue = new MockProcessorQueue();
     TaskQuerier querier = new TaskQuerier(processorQueue, prioritizer);
     Thread querierThread = new Thread(querier);
     querierThread.start();
-    while (querier.getRunnableProcessors().size() != 2) {
-      assertNotNull(querier.getRunnableProcessors());
+    List<WorkflowProcessor> runnables = null;
+    while ((runnables = querier.getRunnableProcessors()) != null && 
+        runnables.size() < 2) {
+      assertNotNull(runnables);
     }
 
     querier.setRunning(false);
-    assertNotNull(querier.getRunnableProcessors());
-    assertEquals(2, querier.getRunnableProcessors().size());
+    assertNotNull(runnables);
+    assertEquals(2, runnables.size());
     TaskProcessor next = querier.getNext();
     assertNotNull(next);
     assertEquals(1, querier.getRunnableProcessors().size());
@@ -62,24 +69,28 @@ public class TestTaskQuerier extends Tes
 
   public void testGetRunnableProcessors() {
     FILOPrioritySorter prioritizer = new FILOPrioritySorter();
-    MockProcessorQueue processorQueue = new MockProcessorQueue();
-    assertNotNull(processorQueue.getProcessors());
-    assertEquals(3, processorQueue.getProcessors().size());
+    MockProcessorQueue processorQueue = new MockProcessorQueue();    
+    List<WorkflowProcessor> queued = null;
+    assertNotNull(queued = processorQueue.getProcessors());
+    assertEquals(3, queued.size());
+    processorQueue = new MockProcessorQueue();
     TaskQuerier querier = new TaskQuerier(processorQueue, prioritizer);
     Thread querierThread = new Thread(querier);
     querierThread.start();
-    while (querier.getRunnableProcessors().size() != 2) {
-      assertNotNull(querier.getRunnableProcessors());
+    List<WorkflowProcessor> runnables = null;
+    while ((runnables = querier.getRunnableProcessors()) != null && 
+        runnables.size() < 2) {
+      assertNotNull(runnables);
     }
 
     querier.setRunning(false);
-    assertNotNull(querier.getRunnableProcessors());
-    assertEquals(2, querier.getRunnableProcessors().size());
-    assertNotNull(querier.getRunnableProcessors().get(0));
-    assertNotNull(querier.getRunnableProcessors().get(0).getPriority());
-    assertEquals(2.1, querier.getRunnableProcessors().get(0).getPriority()
+    assertNotNull(runnables);
+    assertEquals(2, runnables.size());
+    assertNotNull(runnables.get(0));
+    assertNotNull(runnables.get(0).getPriority());
+    assertEquals(2.1, runnables.get(0).getPriority()
         .getValue()); // extra .1 since it's a task
-    assertEquals(7.1, querier.getRunnableProcessors().get(1).getPriority()
+    assertEquals(7.1, runnables.get(1).getPriority()
         .getValue()); // extra .1 since it's a task
     try{
       querierThread.join();

Modified: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java?rev=1374506&r1=1374505&r2=1374506&view=diff
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java (original)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java Sat Aug 18 03:16:21 2012
@@ -74,7 +74,7 @@ public class TestTaskRunner extends Test
     runnerThread.start();
 
     while (!testDir.exists()
-        || (testDir.exists() && testDir.listFiles().length < 2)) {
+        || (testDir.exists() && testDir.listFiles().length != 2)) {
     }
 
     querier.setRunning(false);