You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2012/08/18 05:16:21 UTC
svn commit: r1374506 - in /oodt/trunk/workflow/src:
main/java/org/apache/oodt/cas/workflow/engine/
test/org/apache/oodt/cas/workflow/engine/
Author: mattmann
Date: Sat Aug 18 03:16:21 2012
New Revision: 1374506
URL: http://svn.apache.org/viewvc?rev=1374506&view=rev
Log:
- OODT-310 WIP: make the unit tests more predictable
- MockProcessorQueue returns 3 processors at first, then returns 0 (they have been consumed)
- TestTaskQuerier and TestTaskRunner now set temp variables to store queue results and then test them
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java?rev=1374506&r1=1374505&r2=1374506&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java Sat Aug 18 03:16:21 2012
@@ -20,6 +20,8 @@ package org.apache.oodt.cas.workflow.eng
//JDK imports
import java.util.List;
import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
//OODT imports
import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
@@ -49,6 +51,8 @@ public class TaskQuerier implements Runn
private List<WorkflowProcessor> runnableProcessors;
private PrioritySorter prioritizer;
+
+ private static final Logger LOG = Logger.getLogger(TaskQuerier.class.getName());
/**
* Constructs a new TaskQuerier with the given {@link WorkflowProcessorQueue},
@@ -92,7 +96,7 @@ public class TaskQuerier implements Runn
for (TaskProcessor tp : processor.getRunnableWorkflowProcessors()) {
tp.setState(lifecycle.createState("Executing", "running",
"Added to Runnable queue"));
- System.out.println("Added processor with priority: ["+tp.getPriority()+"]");
+ LOG.log(Level.INFO, "Added processor with priority: ["+tp.getPriority()+"]");
processorsToRun.add(tp);
}
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java?rev=1374506&r1=1374505&r2=1374506&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java Sat Aug 18 03:16:21 2012
@@ -29,58 +29,52 @@ import org.apache.oodt.cas.workflow.stru
import org.apache.oodt.cas.workflow.structs.WorkflowTask;
/**
- *
+ *
* Implements the TaskRunner framework. Acts as a thread that works with the
* TaskQuerier to take the next sorted (aka ones that have been sorted with the
* Workflow PrioritySorter) task and then leverage the Engine's Runner to
* execute the task.
- *
+ *
* The TaskRunner thread first pops a task off the list using
* {@link TaskQuerier#getNext()} and then so long as the thread's
* {@link #runner} has open slots as returned by
* {@link EngineRunner#hasOpenSlots(WorkflowTask)}, and {@link #isPause()} is
* false and {@link #isRunning()} is true, then the task is handed off to the
* runner for execution.
- *
+ *
* The TaskRunner thread can be paused during which time it waits
* {@link #waitSeconds} seconds, wakes up to see if it's unpaused, and then goes
* back to sleep if not, otherwise, resumes executing if it was unpaused.
- *
+ *
* @since Apache OODT 0.5
- *
+ *
* @author mattmann
* @author bfoster
* @version $Revision$
- *
+ *
*/
-//TODO(bfoster): Rename... Runner is missleading.
+// TODO(bfoster): Rename... Runner is missleading.
public class TaskRunner implements Runnable {
private boolean running;
- private boolean pause;
-
private final TaskQuerier taskQuerier;
private final EngineRunner runner;
- private int waitSeconds;
-
private static final Logger LOG = Logger
.getLogger(TaskRunner.class.getName());
public TaskRunner(TaskQuerier taskQuerier, EngineRunner runner,
int waitSeconds) {
this.running = true;
- this.pause = false;
this.taskQuerier = taskQuerier;
this.runner = runner;
- this.waitSeconds = waitSeconds;
}
/*
* (non-Javadoc)
- *
+ *
* @see java.lang.Runnable#run()
*/
@Override
@@ -89,66 +83,29 @@ public class TaskRunner implements Runna
TaskProcessor nextTaskProcessor = null;
while (running) {
- try {
- nextTaskProcessor = taskQuerier.getNext();
- nextTask = nextTaskProcessor != null ?
- extractTaskFromProcessor(nextTaskProcessor):null;
-
- while (running && !pause && nextTask != null
- && runner.hasOpenSlots(nextTask)) {
-
- // TODO: set Workflow met here?
+ nextTaskProcessor = taskQuerier.getNext();
+ nextTask = nextTaskProcessor != null ? extractTaskFromProcessor(nextTaskProcessor)
+ : null;
+
+ try {
+ if (nextTaskProcessor != null && runner.hasOpenSlots(nextTask)) {
+ // TODO: set Workflow met here?
runner.execute(nextTask, nextTaskProcessor.getDynamicMetadata());
- nextTaskProcessor = taskQuerier.getNext();
- nextTask = nextTaskProcessor != null ?
- extractTaskFromProcessor(nextTaskProcessor):null;
}
- }
- catch(InterruptedException e){
- this.running = false;
- break;
- }
- catch (Exception e) {
+ } catch (Exception e) {
+ e.printStackTrace();
LOG.log(
Level.SEVERE,
"Engine failed while submitting jobs to its runner : "
+ e.getMessage(), e);
if (nextTask != null) {
- nextTaskProcessor.setState(nextTaskProcessor
- .getLifecycleManager()
- .getDefaultLifecycle()
- .createState("Failure", "done",
- "Failed while submitting job to Runner : " + e.getMessage()));
+ this.flagProcessorAsFailed(nextTaskProcessor, e.getMessage());
nextTask = null;
nextTaskProcessor = null;
}
}
-
- try {
- synchronized (this) {
- do {
- this.wait(waitSeconds * 1000);
- } while (pause);
- }
- } catch (Exception ignore) {
- }
}
-
- }
-
- /**
- * @return the waitSeconds
- */
- public int getWaitSeconds() {
- return waitSeconds;
- }
- /**
- * @param waitSeconds
- * the waitSeconds to set
- */
- public void setWaitSeconds(int waitSeconds) {
- this.waitSeconds = waitSeconds;
}
/**
@@ -166,21 +123,6 @@ public class TaskRunner implements Runna
this.running = running;
}
- /**
- * @return the pause
- */
- public boolean isPause() {
- return pause;
- }
-
- /**
- * @param pause
- * the pause to set
- */
- public void setPause(boolean pause) {
- this.pause = pause;
- }
-
protected WorkflowTask extractTaskFromProcessor(TaskProcessor taskProcessor) {
WorkflowInstance inst = taskProcessor.getWorkflowInstance();
ParentChildWorkflow workflow = inst.getParentChildWorkflow();
@@ -194,4 +136,13 @@ public class TaskRunner implements Runna
return null;
}
+ private void flagProcessorAsFailed(TaskProcessor nextTaskProcessor, String msg) {
+ nextTaskProcessor.setState(nextTaskProcessor
+ .getLifecycleManager()
+ .getDefaultLifecycle()
+ .createState("Failure", "done",
+ "Failed while submitting job to Runner : " + msg));
+
+ }
+
}
Modified: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java?rev=1374506&r1=1374505&r2=1374506&view=diff
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java (original)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java Sat Aug 18 03:16:21 2012
@@ -15,7 +15,6 @@
* limitations under the License.
*/
-
package org.apache.oodt.cas.workflow.engine;
//JDK imports
@@ -26,35 +25,40 @@ import org.apache.oodt.cas.workflow.engi
import org.apache.oodt.cas.workflow.engine.processor.WorkflowProcessorQueue;
/**
- *
+ *
* A mock {@link WorkflowProcessorQueue} object for use in testing.
- *
+ *
* @author mattmann
* @version $Revision$
- *
+ *
*/
public class MockProcessorQueue extends WorkflowProcessorQueue {
-
+
private QuerierAndRunnerUtils utils;
-
- public MockProcessorQueue(){
+
+ private boolean consumed;
+
+ public MockProcessorQueue() {
this.utils = new QuerierAndRunnerUtils();
+ this.consumed = false;
}
/*
* (non-Javadoc)
*
* @see
- * org.apache.oodt.cas.workflow.engine.WorkflowProcessorQueue#getProcessors
- * ()
+ * org.apache.oodt.cas.workflow.engine.WorkflowProcessorQueue#getProcessors ()
*/
@Override
public synchronized List<WorkflowProcessor> getProcessors() {
List<WorkflowProcessor> processors = new Vector<WorkflowProcessor>();
try {
- processors.add(utils.getProcessor(10.0, "Success", "done"));
- processors.add(utils.getProcessor(2.0, "Loaded", "initial"));
- processors.add(utils.getProcessor(7.0, "Loaded", "initial"));
+ if (!consumed) {
+ processors.add(utils.getProcessor(10.0, "Success", "done"));
+ processors.add(utils.getProcessor(2.0, "Loaded", "initial"));
+ processors.add(utils.getProcessor(7.0, "Loaded", "initial"));
+ this.consumed = true;
+ }
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
Modified: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java?rev=1374506&r1=1374505&r2=1374506&view=diff
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java (original)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java Sat Aug 18 03:16:21 2012
@@ -18,7 +18,10 @@
package org.apache.oodt.cas.workflow.engine;
//OODT imports
+import java.util.List;
+
import org.apache.oodt.cas.workflow.engine.processor.TaskProcessor;
+import org.apache.oodt.cas.workflow.engine.processor.WorkflowProcessor;
import org.apache.oodt.cas.workflow.structs.FILOPrioritySorter;
//Junit imports
@@ -43,18 +46,22 @@ public class TestTaskQuerier extends Tes
public void testGetNext(){
FILOPrioritySorter prioritizer = new FILOPrioritySorter();
MockProcessorQueue processorQueue = new MockProcessorQueue();
- assertNotNull(processorQueue.getProcessors());
- assertEquals(3, processorQueue.getProcessors().size());
+ List<WorkflowProcessor> queued = null;
+ assertNotNull(queued = processorQueue.getProcessors());
+ assertEquals(3, queued.size());
+ processorQueue = new MockProcessorQueue();
TaskQuerier querier = new TaskQuerier(processorQueue, prioritizer);
Thread querierThread = new Thread(querier);
querierThread.start();
- while (querier.getRunnableProcessors().size() != 2) {
- assertNotNull(querier.getRunnableProcessors());
+ List<WorkflowProcessor> runnables = null;
+ while ((runnables = querier.getRunnableProcessors()) != null &&
+ runnables.size() < 2) {
+ assertNotNull(runnables);
}
querier.setRunning(false);
- assertNotNull(querier.getRunnableProcessors());
- assertEquals(2, querier.getRunnableProcessors().size());
+ assertNotNull(runnables);
+ assertEquals(2, runnables.size());
TaskProcessor next = querier.getNext();
assertNotNull(next);
assertEquals(1, querier.getRunnableProcessors().size());
@@ -62,24 +69,28 @@ public class TestTaskQuerier extends Tes
public void testGetRunnableProcessors() {
FILOPrioritySorter prioritizer = new FILOPrioritySorter();
- MockProcessorQueue processorQueue = new MockProcessorQueue();
- assertNotNull(processorQueue.getProcessors());
- assertEquals(3, processorQueue.getProcessors().size());
+ MockProcessorQueue processorQueue = new MockProcessorQueue();
+ List<WorkflowProcessor> queued = null;
+ assertNotNull(queued = processorQueue.getProcessors());
+ assertEquals(3, queued.size());
+ processorQueue = new MockProcessorQueue();
TaskQuerier querier = new TaskQuerier(processorQueue, prioritizer);
Thread querierThread = new Thread(querier);
querierThread.start();
- while (querier.getRunnableProcessors().size() != 2) {
- assertNotNull(querier.getRunnableProcessors());
+ List<WorkflowProcessor> runnables = null;
+ while ((runnables = querier.getRunnableProcessors()) != null &&
+ runnables.size() < 2) {
+ assertNotNull(runnables);
}
querier.setRunning(false);
- assertNotNull(querier.getRunnableProcessors());
- assertEquals(2, querier.getRunnableProcessors().size());
- assertNotNull(querier.getRunnableProcessors().get(0));
- assertNotNull(querier.getRunnableProcessors().get(0).getPriority());
- assertEquals(2.1, querier.getRunnableProcessors().get(0).getPriority()
+ assertNotNull(runnables);
+ assertEquals(2, runnables.size());
+ assertNotNull(runnables.get(0));
+ assertNotNull(runnables.get(0).getPriority());
+ assertEquals(2.1, runnables.get(0).getPriority()
.getValue()); // extra .1 since it's a task
- assertEquals(7.1, querier.getRunnableProcessors().get(1).getPriority()
+ assertEquals(7.1, runnables.get(1).getPriority()
.getValue()); // extra .1 since it's a task
try{
querierThread.join();
Modified: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java?rev=1374506&r1=1374505&r2=1374506&view=diff
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java (original)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java Sat Aug 18 03:16:21 2012
@@ -74,7 +74,7 @@ public class TestTaskRunner extends Test
runnerThread.start();
while (!testDir.exists()
- || (testDir.exists() && testDir.listFiles().length < 2)) {
+ || (testDir.exists() && testDir.listFiles().length != 2)) {
}
querier.setRunning(false);