You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2012/08/01 07:16:41 UTC
svn commit: r1367859 - in /oodt/trunk/workflow/src:
main/java/org/apache/oodt/cas/workflow/engine/
test/org/apache/oodt/cas/workflow/engine/
Author: mattmann
Date: Wed Aug 1 05:16:41 2012
New Revision: 1367859
URL: http://svn.apache.org/viewvc?rev=1367859&view=rev
Log:
- OODT-310: WIP: Port WEngine to trunk
Added:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/QuerierAndRunnerUtils.java
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/AsynchronousLocalEngineRunner.java Wed Aug 1 05:16:41 2012
@@ -133,4 +133,13 @@ public class AsynchronousLocalEngineRunn
}
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.workflow.engine.EngineRunner#hasOpenSlots(org.apache.oodt.cas.workflow.structs.WorkflowTask)
+ */
+ @Override
+ public boolean hasOpenSlots(WorkflowTask workflowTask) throws Exception {
+ // TODO Auto-generated method stub
+ return true;
+ }
+
}
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/EngineRunner.java Wed Aug 1 05:16:41 2012
@@ -57,5 +57,15 @@ public abstract class EngineRunner {
*
*/
public abstract void shutdown() throws Exception;
+
+ /**
+ * Decides whether or not there are available slots within this runner
+ * to execute the provided {@link WorkflowTask}.
+ *
+ * @param workflowTask The {@link WorkflowTask} to execute.
+ * @return True if there is an open slot, false otherwise.
+ * @throws Exception If any error occurs.
+ */
+ public abstract boolean hasOpenSlots(WorkflowTask workflowTask) throws Exception;
}
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java Wed Aug 1 05:16:41 2012
@@ -71,7 +71,7 @@ public class PrioritizedQueueBasedWorkfl
prioritizer;
this.wmgrUrl = null;
this.conditionWait = conditionWait;
- //this.processorQueue = Collections.synchronizedMap(new HashMap<String, CachedWorkflowProcessor>());
+ this.processorQueue = new WorkflowProcessorQueue();
this.runnableTasks = new Vector<WorkflowProcessor>();
this.executingTasks = Collections.synchronizedMap(new HashMap<String, WorkflowProcessor>());
//this.processorLock = new WorkflowProcessorLock();
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ResourceRunner.java Wed Aug 1 05:16:41 2012
@@ -107,6 +107,17 @@ public class ResourceRunner extends Engi
// TODO Auto-generated method stub
}
+
+
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.workflow.engine.EngineRunner#hasOpenSlots(org.apache.oodt.cas.workflow.structs.WorkflowTask)
+ */
+ @Override
+ public boolean hasOpenSlots(WorkflowTask workflowTask) throws Exception {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
protected boolean safeCheckJobComplete(String jobId) {
try {
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/SynchronousLocalEngineRunner.java Wed Aug 1 05:16:41 2012
@@ -77,4 +77,13 @@ public class SynchronousLocalEngineRunne
}
+ /* (non-Javadoc)
+ * @see org.apache.oodt.cas.workflow.engine.EngineRunner#hasOpenSlots(org.apache.oodt.cas.workflow.structs.WorkflowTask)
+ */
+ @Override
+ public boolean hasOpenSlots(WorkflowTask workflowTask) throws Exception {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
}
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskQuerier.java Wed Aug 1 05:16:41 2012
@@ -90,13 +90,13 @@ public class TaskQuerier implements Runn
tp.setState(lifecycle.createState("Executing", "running",
"Added to Runnable queue"));
System.out.println("Added processor with priority: ["+tp.getPriority()+"]");
- processorsToRun.add(processor);
+ processorsToRun.add(tp);
}
prioritizer.sort(processorsToRun);
synchronized(runnableProcessors){
- runnableProcessors = processorsToRun;
+ if(running) runnableProcessors = processorsToRun;
}
} else {
@@ -127,6 +127,19 @@ public class TaskQuerier implements Runn
public List<WorkflowProcessor> getRunnableProcessors() {
return runnableProcessors;
}
+
+ /**
+ * Gets the next available {@link TaskProcessor} from the {@link List}
+ * of {@link #runnableProcessors}. Removes that {@link TaskProcessor}
+ * from the actual {@link #runnableProcessors} {@link List}.
+ *
+ * @return The next available {@link TaskProcessor} from the {@link List}
+ * of {@link #runnableProcessors}.
+ */
+ public TaskProcessor getNext(){
+ if(getRunnableProcessors().size() == 0) return null;
+ return (TaskProcessor)getRunnableProcessors().remove(0);
+ }
private WorkflowLifecycle getLifecycleForProcessor(WorkflowProcessor processor) {
if (processor.getWorkflowInstance() != null
Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java?rev=1367859&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java Wed Aug 1 05:16:41 2012
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.structs.ParentChildWorkflow;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+
+/**
+ *
+ * Implements the TaskRunner framework. Acts as a thread that works with the
+ * TaskQuerier to take the next sorted (aka ones that have been sorted with the
+ * Workflow PrioritySorter) task and then leverage the Engine's Runner to
+ * execute the task.
+ *
+ * The TaskRunner thread first pops a task off the list using
+ * {@link TaskQuerier#getNext()} and then so long as the thread's
+ * {@link #runner} has open slots as returned by
+ * {@link EngineRunner#hasOpenSlots(WorkflowTask)}, and {@link #isPause()} is
+ * false and {@link #isRunning()} is true, then the task is handed off to the
+ * runner for execution.
+ *
+ * The TaskRunner thread can be paused during which time it waits
+ * {@link #waitSeconds} seconds, wakes up to see if it's unpaused, and then goes
+ * back to sleep if not, otherwise, resumes executing if it was unpaused.
+ *
+ * @since Apache OODT 0.5
+ *
+ * @author mattmann
+ * @author bfoster
+ * @version $Revision$
+ *
+ */
+public class TaskRunner implements Runnable {
+
+ private boolean running;
+
+ private boolean pause;
+
+ private TaskQuerier taskQuerier;
+
+ private EngineRunner runner;
+
+ private int waitSeconds;
+
+ private static final Logger LOG = Logger
+ .getLogger(TaskRunner.class.getName());
+
+ public TaskRunner(TaskQuerier taskQuerier, EngineRunner runner,
+ int waitSeconds) {
+ this.running = true;
+ this.pause = false;
+ this.taskQuerier = taskQuerier;
+ this.runner = runner;
+ this.waitSeconds = waitSeconds;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ WorkflowTask nextTask = null;
+ TaskProcessor nextTaskProcessor = null;
+
+ while (running) {
+ try {
+ if (nextTaskProcessor == null){
+ nextTaskProcessor = taskQuerier.getNext();
+ nextTask = extractTaskFromProcessor(nextTaskProcessor);
+ }
+ while (running && !pause && nextTask != null
+ && runner.hasOpenSlots(nextTask)) {
+ // TODO: set Workflow met here?
+ runner.execute(nextTask, nextTaskProcessor.getDynamicMetadata());
+ nextTaskProcessor = taskQuerier.getNext();
+ nextTask = extractTaskFromProcessor(nextTaskProcessor);
+
+ // take a breather
+ try{
+ Thread.currentThread().sleep(1000); //FIXME: make this configurable
+ }
+ catch (Exception ignore) {}
+ }
+ } catch (Exception e) {
+ LOG.log(
+ Level.SEVERE,
+ "Engine failed while submitting jobs to its runner : "
+ + e.getMessage(), e);
+ if (nextTask != null) {
+ nextTaskProcessor.setState(nextTaskProcessor
+ .getLifecycleManager()
+ .getDefaultLifecycle()
+ .createState("Failure", "done",
+ "Failed while submitting job to Runner : " + e.getMessage()));
+ nextTask = null;
+ nextTaskProcessor = null;
+ }
+ }
+
+ try {
+ synchronized (this) {
+ do {
+ this.wait(waitSeconds * 1000);
+ } while (pause);
+ }
+ } catch (Exception ignore) {
+ }
+ }
+ }
+
+ /**
+ * @return the waitSeconds
+ */
+ public int getWaitSeconds() {
+ return waitSeconds;
+ }
+
+ /**
+ * @param waitSeconds
+ * the waitSeconds to set
+ */
+ public void setWaitSeconds(int waitSeconds) {
+ this.waitSeconds = waitSeconds;
+ }
+
+ /**
+ * @return the running
+ */
+ public boolean isRunning() {
+ return running;
+ }
+
+ /**
+ * @param running
+ * the running to set
+ */
+ public void setRunning(boolean running) {
+ this.running = running;
+ }
+
+ /**
+ * @return the pause
+ */
+ public boolean isPause() {
+ return pause;
+ }
+
+ /**
+ * @param pause
+ * the pause to set
+ */
+ public void setPause(boolean pause) {
+ this.pause = pause;
+ }
+
+ protected WorkflowTask extractTaskFromProcessor(TaskProcessor taskProcessor) {
+ WorkflowInstance inst = taskProcessor.getWorkflowInstance();
+ ParentChildWorkflow workflow = inst.getParentChildWorkflow();
+ String taskId = inst.getCurrentTaskId();
+ for (WorkflowTask task : workflow.getTasks()) {
+ if (task.getTaskId().equals(taskId)) {
+ return task;
+ }
+ }
+
+ return null;
+ }
+
+}
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessor.java Wed Aug 1 05:16:41 2012
@@ -84,6 +84,7 @@ public abstract class WorkflowProcessor
this.timesBlocked = 0;
this.workflowInstance = new WorkflowInstance();
this.lifecycleManager = lifecycleManager;
+ this.priority = Priority.getDefault();
}
/**
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/WorkflowProcessorBuilder.java Wed Aug 1 05:16:41 2012
@@ -45,6 +45,9 @@ public class WorkflowProcessorBuilder {
private WorkflowProcessorBuilder() {
subProcessors = Lists.newArrayList();
+ this.id = null;
+ this.priority = -1;
+ this.lifecycleManager = null;
}
public static WorkflowProcessorBuilder aWorkflowProcessor() {
@@ -77,10 +80,10 @@ public class WorkflowProcessorBuilder {
public WorkflowProcessor build(Class<? extends WorkflowProcessor> clazz)
throws InstantiationException, IllegalAccessException {
WorkflowProcessor wp = clazz.newInstance();
- wp.getWorkflowInstance().setId(id);
- wp.setLifecycleManager(lifecycleManager);
- wp.setPriority(Priority.getPriority(priority));
- wp.setSubProcessors(subProcessors);
+ if(this.id != null) wp.getWorkflowInstance().setId(id);
+ if(this.lifecycleManager != null) wp.setLifecycleManager(lifecycleManager);
+ if(this.priority != -1) wp.setPriority(Priority.getPriority(priority));
+ if(this.subProcessors != null) wp.setSubProcessors(subProcessors);
return wp;
}
}
\ No newline at end of file
Added: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java?rev=1367859&view=auto
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java (added)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/MockProcessorQueue.java Wed Aug 1 05:16:41 2012
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.util.List;
+import java.util.Vector;
+
+/**
+ *
+ * A mock {@link WorkflowProcessorQueue} object for use in testing.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public class MockProcessorQueue extends WorkflowProcessorQueue {
+
+ private QuerierAndRunnerUtils utils;
+
+ public MockProcessorQueue(){
+ this.utils = new QuerierAndRunnerUtils();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowProcessorQueue#getProcessors
+ * ()
+ */
+ @Override
+ public synchronized List<WorkflowProcessor> getProcessors() {
+ List<WorkflowProcessor> processors = new Vector<WorkflowProcessor>();
+ try {
+ processors.add(utils.getProcessor(10.0, "Success", "done"));
+ processors.add(utils.getProcessor(2.0, "Loaded", "initial"));
+ processors.add(utils.getProcessor(7.0, "Loaded", "initial"));
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+
+ return processors;
+ }
+
+}
\ No newline at end of file
Added: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/QuerierAndRunnerUtils.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/QuerierAndRunnerUtils.java?rev=1367859&view=auto
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/QuerierAndRunnerUtils.java (added)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/QuerierAndRunnerUtils.java Wed Aug 1 05:16:41 2012
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+//JDK imports
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Vector;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleManager;
+import org.apache.oodt.cas.workflow.structs.Graph;
+import org.apache.oodt.cas.workflow.structs.ParentChildWorkflow;
+import org.apache.oodt.cas.workflow.structs.Priority;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+import org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration;
+
+/**
+ *
+ * Utilities for testing the {@link TaskQuerier} and {@link TaskRunner} thread
+ * classes.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public class QuerierAndRunnerUtils {
+
+ private int dateGen;
+
+ public QuerierAndRunnerUtils() {
+ this.dateGen = 0;
+ }
+
+ public WorkflowTask getTask(File testDir){
+ WorkflowTask task = new WorkflowTask();
+ task.setConditions(Collections.emptyList());
+ task.setRequiredMetFields(Collections.emptyList());
+ task.setTaskId("urn:cas:workflow:tester");
+ task.setTaskInstanceClassName(SimpleTester.class.getName());
+ task.setTaskName("Tester");
+ WorkflowTaskConfiguration config = new WorkflowTaskConfiguration();
+ config.addConfigProperty("TestDirPath",
+ testDir.getAbsolutePath().endsWith("/") ? testDir.getAbsolutePath()
+ : testDir.getAbsolutePath() + "/");
+ task.setTaskConfig(config);
+ return task;
+ }
+
+ public WorkflowProcessor getProcessor(double priority, String stateName,
+ String categoryName) throws InstantiationException,
+ IllegalAccessException, IOException {
+ WorkflowLifecycleManager lifecycleManager = new WorkflowLifecycleManager(
+ "./src/main/resources/examples/wengine/wengine-lifecycle.xml");
+ WorkflowInstance inst = new WorkflowInstance();
+ Date sd = new Date();
+ sd.setTime(sd.getTime() + (this.dateGen * 5000));
+ this.dateGen++;
+ inst.setStartDate(sd);
+ inst.setId("winst-" + priority);
+ ParentChildWorkflow workflow = new ParentChildWorkflow(new Graph());
+ workflow.setTasks(Collections.EMPTY_LIST);
+ inst.setWorkflow(workflow);
+ inst.setPriority(Priority.getPriority(priority));
+ WorkflowProcessorBuilder builder = WorkflowProcessorBuilder
+ .aWorkflowProcessor().withLifecycleManager(lifecycleManager)
+ .withPriority(priority);
+ SequentialProcessor processor = (SequentialProcessor) builder
+ .build(SequentialProcessor.class);
+ processor.setWorkflowInstance(inst);
+ processor.setState(lifecycleManager.getDefaultLifecycle().createState(
+ stateName, categoryName, ""));
+ List<WorkflowProcessor> runnables = new Vector<WorkflowProcessor>();
+ TaskProcessor taskProcessor = (TaskProcessor) builder
+ .build(TaskProcessor.class);
+ taskProcessor.setState(lifecycleManager.getDefaultLifecycle().createState(
+ "Queued", "waiting", ""));
+ ParentChildWorkflow taskWorkflow = new ParentChildWorkflow(new Graph());
+ taskWorkflow.getTasks().add(getTask(getTmpPath()));
+ WorkflowInstance taskWorkflowInst = new WorkflowInstance();
+ taskWorkflowInst.setWorkflow(taskWorkflow);
+ taskWorkflowInst.setCurrentTaskId(taskWorkflow.getTasks().get(0).getTaskId());
+ taskProcessor.setWorkflowInstance(taskWorkflowInst);
+ runnables.add(taskProcessor);
+ processor.setSubProcessors(runnables);
+ return processor;
+ }
+
+ private File getTmpPath() throws IOException{
+ File testDir = null;
+ String parentPath = File.createTempFile("test", "txt").getParentFile().getAbsolutePath();
+ parentPath = parentPath.endsWith("/") ? parentPath:parentPath + "/";
+ String testJobDirPath = parentPath + "jobs";
+ testDir = new File(testJobDirPath);
+ testDir.mkdirs();
+ return testDir;
+ }
+
+}
Modified: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java (original)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestAsynchronousLocalEngineRunner.java Wed Aug 1 05:16:41 2012
@@ -20,16 +20,15 @@ package org.apache.oodt.cas.workflow.eng
//JDK imports
import java.io.BufferedReader;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
import java.util.Calendar;
-import java.util.Collections;
import java.util.Date;
+//APACHE imports
+import org.apache.commons.io.FileUtils;
+
//OODT imports
import org.apache.oodt.cas.metadata.Metadata;
import org.apache.oodt.cas.workflow.structs.WorkflowTask;
-import org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration;
import org.apache.oodt.commons.date.DateUtils;
import org.apache.oodt.commons.util.DateConvert;
@@ -53,21 +52,14 @@ public class TestAsynchronousLocalEngine
private AsynchronousLocalEngineRunner runner;
protected File testDir;
+
+ private QuerierAndRunnerUtils utils;
public void testRun() {
- WorkflowTask task = new WorkflowTask();
- task.setConditions(Collections.emptyList());
- task.setRequiredMetFields(Collections.emptyList());
- task.setTaskId("urn:cas:workflow:tester");
- task.setTaskInstanceClassName(SimpleTester.class.getName());
- task.setTaskName("Tester");
- WorkflowTaskConfiguration config = new WorkflowTaskConfiguration();
- config.addConfigProperty("TestDirPath",
- testDir.getAbsolutePath().endsWith("/") ? testDir.getAbsolutePath()
- : testDir.getAbsolutePath() + "/");
- task.setTaskConfig(config);
+ WorkflowTask task = utils.getTask(testDir);
Metadata met = new Metadata();
- met.addMetadata("StartDateTime", DateUtils.toString(Calendar.getInstance()));
+ met.addMetadata("StartDateTime", DateUtils.toString(Calendar.getInstance()));
+
try {
runner.execute(task, met);
runner.execute(task, met);
@@ -84,12 +76,7 @@ public class TestAsynchronousLocalEngine
for (File f : this.testDir.listFiles()) {
BufferedReader br = null;
try {
- br = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
- String line = null;
- while ((line = br.readLine()) != null) {
-
- }
-
+ String line = FileUtils.readFileToString(f);
String[] toks = line.split(",");
Date dateTime = DateConvert.isoParse(toks[1]);
Seconds seconds = Seconds.secondsBetween(new DateTime(dateTime),
@@ -130,6 +117,7 @@ public class TestAsynchronousLocalEngine
testDir = new File(testJobDirPath);
testDir.mkdirs();
this.runner = new AsynchronousLocalEngineRunner();
+ this.utils = new QuerierAndRunnerUtils();
}
/*
Modified: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java?rev=1367859&r1=1367858&r2=1367859&view=diff
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java (original)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskQuerier.java Wed Aug 1 05:16:41 2012
@@ -17,18 +17,8 @@
package org.apache.oodt.cas.workflow.engine;
-//JDK imports
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Vector;
-
//OODT imports
-import org.apache.oodt.cas.workflow.lifecycle.WorkflowLifecycleManager;
import org.apache.oodt.cas.workflow.structs.FILOPrioritySorter;
-import org.apache.oodt.cas.workflow.structs.Priority;
-import org.apache.oodt.cas.workflow.structs.Workflow;
-import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
//Junit imports
import junit.framework.TestCase;
@@ -48,6 +38,26 @@ public class TestTaskQuerier extends Tes
public TestTaskQuerier() {
this.dateGen = 0;
}
+
+ public void testGetNext(){
+ FILOPrioritySorter prioritizer = new FILOPrioritySorter();
+ MockProcessorQueue processorQueue = new MockProcessorQueue();
+ assertNotNull(processorQueue.getProcessors());
+ assertEquals(3, processorQueue.getProcessors().size());
+ TaskQuerier querier = new TaskQuerier(processorQueue, prioritizer);
+ Thread querierThread = new Thread(querier);
+ querierThread.start();
+ while (querier.getRunnableProcessors().size() != 2) {
+ assertNotNull(querier.getRunnableProcessors());
+ }
+
+ querier.setRunning(false);
+ assertNotNull(querier.getRunnableProcessors());
+ assertEquals(2, querier.getRunnableProcessors().size());
+ TaskProcessor next = querier.getNext();
+ assertNotNull(next);
+ assertEquals(1, querier.getRunnableProcessors().size());
+ }
public void testGetRunnableProcessors() {
FILOPrioritySorter prioritizer = new FILOPrioritySorter();
@@ -66,10 +76,10 @@ public class TestTaskQuerier extends Tes
assertEquals(2, querier.getRunnableProcessors().size());
assertNotNull(querier.getRunnableProcessors().get(0));
assertNotNull(querier.getRunnableProcessors().get(0).getPriority());
- assertEquals(2.0, querier.getRunnableProcessors().get(0).getPriority()
- .getValue());
- assertEquals(7.0, querier.getRunnableProcessors().get(1).getPriority()
- .getValue());
+ assertEquals(2.1, querier.getRunnableProcessors().get(0).getPriority()
+ .getValue()); // extra .1 since it's a task
+ assertEquals(7.1, querier.getRunnableProcessors().get(1).getPriority()
+ .getValue()); // extra .1 since it's a task
try{
querierThread.join();
}
@@ -77,63 +87,4 @@ public class TestTaskQuerier extends Tes
}
- private WorkflowProcessor getProcessor(double priority, String stateName,
- String categoryName) throws InstantiationException, IllegalAccessException {
- WorkflowLifecycleManager lifecycleManager = new WorkflowLifecycleManager(
- "./src/main/resources/examples/wengine/wengine-lifecycle.xml");
- WorkflowInstance inst = new WorkflowInstance();
- Date sd = new Date();
- sd.setTime(sd.getTime() + (this.dateGen * 5000));
- this.dateGen++;
- inst.setStartDate(sd);
- inst.setId("winst-" + priority);
- Workflow workflow = new Workflow();
- workflow.setTasks(Collections.EMPTY_LIST);
- inst.setWorkflow(workflow);
- inst.setPriority(Priority.getPriority(priority));
- WorkflowProcessorBuilder builder = WorkflowProcessorBuilder.aWorkflowProcessor()
- .withLifecycleManager(lifecycleManager)
- .withPriority(priority);
- SequentialProcessor processor = (SequentialProcessor)builder.build(SequentialProcessor.class);
- processor.setWorkflowInstance(inst);
- processor.setState(lifecycleManager.getDefaultLifecycle().createState(
- stateName, categoryName, ""));
- assertNotNull(processor.getState());
- assertNotNull(processor.getState().getCategory());
- assertNotNull(processor.getState().getCategory().getName());
- List<WorkflowProcessor> runnables = new Vector<WorkflowProcessor>();
- TaskProcessor taskProcessor = (TaskProcessor)builder.build(TaskProcessor.class);
- taskProcessor.setState(lifecycleManager.getDefaultLifecycle().createState(
- "Queued", "waiting", ""));
- runnables.add(taskProcessor);
- processor.setSubProcessors(runnables);
- return processor;
- }
-
- class MockProcessorQueue extends WorkflowProcessorQueue {
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.oodt.cas.workflow.engine.WorkflowProcessorQueue#getProcessors
- * ()
- */
- @Override
- public synchronized List<WorkflowProcessor> getProcessors() {
- List<WorkflowProcessor> processors = new Vector<WorkflowProcessor>();
- try {
- processors.add(getProcessor(10.0, "Success", "done"));
- processors.add(getProcessor(2.0, "Loaded", "initial"));
- processors.add(getProcessor(7.0, "Loaded", "initial"));
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- return processors;
- }
-
- }
-
}