You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2012/08/18 06:47:11 UTC
svn commit: r1374513 - in /oodt/trunk/workflow/src:
main/java/org/apache/oodt/cas/workflow/engine/
main/java/org/apache/oodt/cas/workflow/engine/runner/
test/org/apache/oodt/cas/workflow/engine/
Author: mattmann
Date: Sat Aug 18 04:47:10 2012
New Revision: 1374513
URL: http://svn.apache.org/viewvc?rev=1374513&view=rev
Log:
- OODT-310 WIP: start fleshing out the PQueueWorkflowEngine
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java
oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java?rev=1374513&r1=1374512&r2=1374513&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java Sat Aug 18 04:47:10 2012
@@ -42,26 +42,20 @@ import org.apache.oodt.cas.workflow.stru
import org.apache.oodt.commons.util.DateConvert;
/**
- *
+ *
* Describe your class here.
+ *
* @author mattmann
* @author bfoster
* @version $Revision$
- *
+ *
*/
public class PrioritizedQueueBasedWorkflowEngine implements WorkflowEngine {
-
- private static final Logger LOG = Logger.getLogger(PrioritizedQueueBasedWorkflowEngine.class.getName());
-
- //private Map<String, WorkflowPro> processorQueue;
- private final List<WorkflowProcessor> runnableTasks;
- private final Map<String, WorkflowProcessor> executingTasks;
- //private WorkflowProcessorLock processorLock;
- private List<String> metadataKeysToCache;
- private boolean debugMode;
- private final boolean allowQueuerToWork;
+ private static final Logger LOG = Logger
+ .getLogger(PrioritizedQueueBasedWorkflowEngine.class.getName());
private final Thread queuerThread;
+ private final Thread runnerThread;
private final WorkflowInstanceRepository repo;
private final PrioritySorter prioritizer;
private WorkflowProcessorQueue processorQueue;
@@ -69,49 +63,60 @@ public class PrioritizedQueueBasedWorkfl
private final long conditionWait;
private EngineRunner runner;
- public PrioritizedQueueBasedWorkflowEngine(WorkflowInstanceRepository repo, PrioritySorter prioritizer, long conditionWait){
+ public PrioritizedQueueBasedWorkflowEngine(WorkflowInstanceRepository repo,
+ PrioritySorter prioritizer, long conditionWait) {
this.repo = repo;
- this.prioritizer = prioritizer != null ? new HighestFIFOPrioritySorter(1, 50, 1/*secondsBetweenBoosts, boostAmount, boostCap*/):
- prioritizer;
+ this.prioritizer = prioritizer != null ? new HighestFIFOPrioritySorter(1,
+ 50, 1) : prioritizer;
this.wmgrUrl = null;
this.conditionWait = conditionWait;
this.processorQueue = new WorkflowProcessorQueue();
- this.runnableTasks = new Vector<WorkflowProcessor>();
- this.executingTasks = Collections.synchronizedMap(new HashMap<String, WorkflowProcessor>());
- //this.processorLock = new WorkflowProcessorLock();
- if (metadataKeysToCache != null)
- this.metadataKeysToCache = new Vector<String>(metadataKeysToCache);
-// this.debugMode = debugMode;
- this.allowQueuerToWork = true;
-
- /* try {
- this.loadProcessorRepo();
- }catch (Exception e) {
- e.printStackTrace();
- }*/
+
// Task QUEUER thread
- queuerThread = new Thread(new TaskQuerier(processorQueue, prioritizer));
+ TaskQuerier querier = new TaskQuerier(processorQueue, this.prioritizer);
+ queuerThread = new Thread(querier);
queuerThread.start();
+
+ // Task Runner thread
+ runnerThread = new Thread(new TaskRunner(querier, runner));
+ runnerThread.start();
+
}
@Override
public void setEngineRunner(EngineRunner runner) {
- this.runner = runner;
+ this.runner = runner;
}
- /* (non-Javadoc)
- * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#startWorkflow(org.apache.oodt.cas.workflow.structs.Workflow, org.apache.oodt.cas.metadata.Metadata)
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#startWorkflow(org.apache
+ * .oodt.cas.workflow.structs.Workflow, org.apache.oodt.cas.metadata.Metadata)
*/
@Override
public WorkflowInstance startWorkflow(Workflow workflow, Metadata metadata)
throws EngineException {
// TODO Auto-generated method stub
+
+ //looks like the work to do here is
+ // create a new WorkflowInstance
+ // create a new WorkflowProcessor around it
+ // set it in Queued status
+ // commit it to workflow instance repo and it will get picked up
+ // by the runner thread
+
return null;
}
- /* (non-Javadoc)
- * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#stopWorkflow(java.lang.String)
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#stopWorkflow(java.lang
+ * .String)
*/
@Override
public void stopWorkflow(String workflowInstId) {
@@ -119,8 +124,12 @@ public class PrioritizedQueueBasedWorkfl
}
- /* (non-Javadoc)
- * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#pauseWorkflowInstance(java.lang.String)
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#pauseWorkflowInstance
+ * (java.lang.String)
*/
@Override
public void pauseWorkflowInstance(String workflowInstId) {
@@ -128,8 +137,12 @@ public class PrioritizedQueueBasedWorkfl
}
- /* (non-Javadoc)
- * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#resumeWorkflowInstance(java.lang.String)
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#resumeWorkflowInstance
+ * (java.lang.String)
*/
@Override
public void resumeWorkflowInstance(String workflowInstId) {
@@ -137,17 +150,23 @@ public class PrioritizedQueueBasedWorkfl
}
- /* (non-Javadoc)
- * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#getInstanceRepository()
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#getInstanceRepository()
*/
@Override
public WorkflowInstanceRepository getInstanceRepository() {
- // TODO Auto-generated method stub
- return null;
+ return this.repo;
}
- /* (non-Javadoc)
- * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#updateMetadata(java.lang.String, org.apache.oodt.cas.metadata.Metadata)
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#updateMetadata(java.
+ * lang.String, org.apache.oodt.cas.metadata.Metadata)
*/
@Override
public boolean updateMetadata(String workflowInstId, Metadata met) {
@@ -155,8 +174,12 @@ public class PrioritizedQueueBasedWorkfl
return false;
}
- /* (non-Javadoc)
- * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#setWorkflowManagerUrl(java.net.URL)
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#setWorkflowManagerUrl
+ * (java.net.URL)
*/
@Override
public void setWorkflowManagerUrl(URL url) {
@@ -164,8 +187,12 @@ public class PrioritizedQueueBasedWorkfl
}
- /* (non-Javadoc)
- * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWallClockMinutes(java.lang.String)
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWallClockMinutes(
+ * java.lang.String)
*/
@Override
public double getWallClockMinutes(String workflowInstId) {
@@ -173,8 +200,11 @@ public class PrioritizedQueueBasedWorkfl
return 0;
}
- /* (non-Javadoc)
- * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#getCurrentTaskWallClockMinutes(java.lang.String)
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#
+ * getCurrentTaskWallClockMinutes(java.lang.String)
*/
@Override
public double getCurrentTaskWallClockMinutes(String workflowInstId) {
@@ -182,8 +212,12 @@ public class PrioritizedQueueBasedWorkfl
return 0;
}
- /* (non-Javadoc)
- * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWorkflowInstanceMetadata(java.lang.String)
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWorkflowInstanceMetadata
+ * (java.lang.String)
*/
@Override
public Metadata getWorkflowInstanceMetadata(String workflowInstId) {
@@ -191,8 +225,6 @@ public class PrioritizedQueueBasedWorkfl
return null;
}
+ // FIXME: add in methods from WEngine
- //FIXME: add in methods from WEngine
-
- }
-
+}
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java?rev=1374513&r1=1374512&r2=1374513&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/TaskRunner.java Sat Aug 18 04:47:10 2012
@@ -65,8 +65,7 @@ public class TaskRunner implements Runna
private static final Logger LOG = Logger
.getLogger(TaskRunner.class.getName());
- public TaskRunner(TaskQuerier taskQuerier, EngineRunner runner,
- int waitSeconds) {
+ public TaskRunner(TaskQuerier taskQuerier, EngineRunner runner) {
this.running = true;
this.taskQuerier = taskQuerier;
this.runner = runner;
@@ -89,7 +88,6 @@ public class TaskRunner implements Runna
try {
if (nextTaskProcessor != null && runner.hasOpenSlots(nextTask)) {
- // TODO: set Workflow met here?
runner.execute(nextTask, nextTaskProcessor.getDynamicMetadata());
}
} catch (Exception e) {
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java?rev=1374513&r1=1374512&r2=1374513&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/runner/AsynchronousLocalEngineRunner.java Sat Aug 18 04:47:10 2012
@@ -21,10 +21,8 @@ package org.apache.oodt.cas.workflow.eng
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -47,7 +45,7 @@ public class AsynchronousLocalEngineRunn
public static final int DEFAULT_NUM_THREADS = 25;
- private final ThreadPoolExecutor executor;
+ private final ExecutorService executor;
private final Map<String, Thread> workerMap;
public AsynchronousLocalEngineRunner() {
@@ -55,17 +53,7 @@ public class AsynchronousLocalEngineRunn
}
public AsynchronousLocalEngineRunner(int numThreads) {
- this.executor = new ThreadPoolExecutor(numThreads, numThreads, 30,
- TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
- new RejectedExecutionHandler() {
-
- @Override
- public void rejectedExecution(Runnable workflow,
- ThreadPoolExecutor executor) {
- // TODO Auto-generated method stub
-
- }
- });
+ this.executor = Executors.newFixedThreadPool(DEFAULT_NUM_THREADS);
this.workerMap = new HashMap<String, Thread>();
}
@@ -110,7 +98,7 @@ public class AsynchronousLocalEngineRunn
};
- String id = "";
+ String id = UUID.randomUUID().toString();
synchronized (id) {
id = UUID.randomUUID().toString();
this.workerMap.put(id, worker);
Modified: oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java?rev=1374513&r1=1374512&r2=1374513&view=diff
==============================================================================
--- oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java (original)
+++ oodt/trunk/workflow/src/test/org/apache/oodt/cas/workflow/engine/TestTaskRunner.java Sat Aug 18 04:47:10 2012
@@ -63,7 +63,7 @@ public class TestTaskRunner extends Test
assertNotNull(runnables);
assertEquals(2, runnables.size());
runner = new AsynchronousLocalEngineRunner();
- taskRunner = new TaskRunner(querier, runner, 2);
+ taskRunner = new TaskRunner(querier, runner);
assertNotNull(taskRunner);
Thread runnerThread = new Thread(taskRunner);
WorkflowTask task = taskRunner