You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ma...@apache.org on 2012/02/21 22:17:29 UTC
svn commit: r1292037 - in
/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine:
PrioritizedQueueBasedWorkflowEngine.java ThreadPoolWorkflowEngine.java
Author: mattmann
Date: Tue Feb 21 21:17:28 2012
New Revision: 1292037
URL: http://svn.apache.org/viewvc?rev=1292037&view=rev
Log:
- OODT-215/OODT-310 WIP: Port WEngine to trunk
- add pqueue-based engine (the basis for wengine2)
- just a skeleton
- modifying thread pool wengine to work with OODT-381 runner framework
Added:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
Modified:
oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java
Added: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java?rev=1292037&view=auto
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java (added)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/PrioritizedQueueBasedWorkflowEngine.java Tue Feb 21 21:17:28 2012
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oodt.cas.workflow.engine;
+
+import java.net.URL;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.Vector;
+
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.structs.PrioritySorter;
+import org.apache.oodt.cas.workflow.structs.Workflow;
+import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
+import org.apache.oodt.cas.workflow.structs.WorkflowStatus;
+import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+import org.apache.oodt.cas.workflow.structs.exceptions.EngineException;
+import org.apache.oodt.commons.util.DateConvert;
+
+/**
+ *
+ * Describe your class here.
+ *
+ * @author mattmann
+ * @version $Revision$
+ *
+ */
+public class PrioritizedQueueBasedWorkflowEngine implements WorkflowEngine {
+
+ private WorkflowInstanceRepository repo;
+
+ private PrioritySorter prioritizer;
+
+ private URL wmgrUrl;
+
+ private long conditionWait;
+
+ public PrioritizedQueueBasedWorkflowEngine(WorkflowInstanceRepository repo, PrioritySorter prioritizer, long conditionWait){
+ this.repo = repo;
+ this.prioritizer = prioritizer;
+ this.wmgrUrl = null;
+ this.conditionWait = conditionWait;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#startWorkflow(org.apache
+ * .oodt.cas.workflow.structs.Workflow, org.apache.oodt.cas.metadata.Metadata)
+ */
+ @Override
+ public WorkflowInstance startWorkflow(Workflow workflow, Metadata metadata)
+ throws EngineException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#stopWorkflow(java.lang
+ * .String)
+ */
+ @Override
+ public void stopWorkflow(String workflowInstId) {
+ // TODO Auto-generated method stub
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#pauseWorkflowInstance
+ * (java.lang.String)
+ */
+ @Override
+ public void pauseWorkflowInstance(String workflowInstId) {
+ // TODO Auto-generated method stub
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#resumeWorkflowInstance
+ * (java.lang.String)
+ */
+ @Override
+ public void resumeWorkflowInstance(String workflowInstId) {
+ // TODO Auto-generated method stub
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#getInstanceRepository()
+ */
+ @Override
+ public WorkflowInstanceRepository getInstanceRepository() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#updateMetadata(java.
+ * lang.String, org.apache.oodt.cas.metadata.Metadata)
+ */
+ @Override
+ public boolean updateMetadata(String workflowInstId, Metadata met) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#setWorkflowManagerUrl
+ * (java.net.URL)
+ */
+ @Override
+ public void setWorkflowManagerUrl(URL url) {
+ // TODO Auto-generated method stub
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWallClockMinutes(
+ * java.lang.String)
+ */
+ @Override
+ public double getWallClockMinutes(String workflowInstId) {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#
+ * getCurrentTaskWallClockMinutes(java.lang.String)
+ */
+ @Override
+ public double getCurrentTaskWallClockMinutes(String workflowInstId) {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWorkflowInstanceMetadata
+ * (java.lang.String)
+ */
+ @Override
+ public Metadata getWorkflowInstanceMetadata(String workflowInstId) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ class QueueWorker implements Runnable{
+
+ private boolean work;
+
+ public QueueWorker(){
+ this.work = true;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ while(work) {
+ try {
+ List<WorkflowInstance> instances = null;
+ synchronized(repo){
+ instances = repo.getWorkflowInstances();
+ }
+
+ List<WorkflowProcessor> runnableProcessors = new Vector<WorkflowProcessor>();
+ for (WorkflowInstance instance : instances) {
+ if (!work)
+ break;
+ if (isRunnableInstance(instance)) {
+ synchronized(repo){
+ instance.setStatus(WorkflowStatus.STARTED);
+ repo.updateWorkflowInstance(instance);
+ }
+
+ synchronized(runnableProcessors){
+ WorkflowInstance inst = new WorkflowInstance();
+ Workflow workflow = new Workflow();
+ workflow.setId(instance.getId()+"-"+instance.getCurrentTaskId());
+ WorkflowTask task = getTask(instance.getWorkflow().getTasks(), instance.getCurrentTaskId());
+ workflow.setName(task.getTaskName());
+ workflow.getTasks().add(task);
+ inst.setId(UUID.randomUUID().toString());
+ inst.setWorkflow(workflow);
+ inst.setCurrentTaskStartDateTimeIsoStr(DateConvert.isoFormat(new Date()));
+ inst.setPriority(instance.getPriority());
+ inst.setSharedContext(instance.getSharedContext());
+
+ SequentialProcessor processor =
+ new SequentialProcessor(inst, repo, wmgrUrl, conditionWait);
+ runnableProcessors.add(processor);
+ }
+ }
+
+ prioritizer.sort(runnableProcessors);
+
+ //take a breather
+ try {
+ synchronized(this) {
+ this.wait(1);
+ }
+ }catch (Exception e){}
+ }
+ }catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ try {
+ synchronized(this) {
+ this.wait(2000);
+ }
+ }catch (Exception e){}
+ }
+
+ private WorkflowTask getTask(List<WorkflowTask> tasks, String id){
+ if(tasks != null && tasks.size() > 0){
+ for(WorkflowTask task: tasks){
+ if(task.getTaskId().equals(id)){
+ return task;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ private boolean isRunnableInstance(WorkflowInstance instance){
+ return !instance.getStatus().equals(WorkflowStatus.ERROR) &&
+ !instance.getStatus().equals(WorkflowStatus.FINISHED) &&
+ !instance.getStatus().equals(WorkflowStatus.METADATA_MISSING) &&
+ !instance.getStatus().equals(WorkflowStatus.PAUSED);
+ }
+ }
+
+
+}
Modified: oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java
URL: http://svn.apache.org/viewvc/oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java?rev=1292037&r1=1292036&r2=1292037&view=diff
==============================================================================
--- oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java (original)
+++ oodt/trunk/workflow/src/main/java/org/apache/oodt/cas/workflow/engine/ThreadPoolWorkflowEngine.java Tue Feb 21 21:17:28 2012
@@ -19,19 +19,27 @@ package org.apache.oodt.cas.workflow.eng
//OODT imports
import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.resource.structs.Job;
+import org.apache.oodt.cas.resource.structs.exceptions.JobExecutionException;
import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
+import org.apache.oodt.cas.workflow.structs.TaskJobInput;
import org.apache.oodt.cas.workflow.structs.Workflow;
import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
import org.apache.oodt.cas.workflow.structs.WorkflowStatus;
import org.apache.oodt.cas.workflow.structs.WorkflowTask;
+import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
import org.apache.oodt.cas.workflow.structs.exceptions.EngineException;
import org.apache.oodt.cas.workflow.structs.exceptions.InstanceRepositoryException;
+import org.apache.oodt.cas.workflow.util.GenericWorkflowObjectFactory;
import org.apache.oodt.cas.workflow.engine.SequentialProcessor;
import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
+import org.apache.oodt.cas.workflow.metadata.CoreMetKeys;
import org.apache.oodt.commons.util.DateConvert;
//JDK imports
+import java.net.InetAddress;
import java.net.URL;
+import java.net.UnknownHostException;
import java.text.ParseException;
import java.util.Date;
import java.util.HashMap;
@@ -69,15 +77,16 @@ public class ThreadPoolWorkflowEngine im
/* our instance repository */
private WorkflowInstanceRepository instRep = null;
- /* our resource manager client */
- private XmlRpcResourceManagerClient rClient = null;
-
/* the URL pointer to the parent Workflow Manager */
private URL wmgrUrl = null;
/* how long to wait before checking whether a condition is satisfied. */
private long conditionWait;
+ private ConditionProcessor condProcessor;
+
+ private EngineRunner runner;
+
/**
* Default Constructor.
*
@@ -120,12 +129,17 @@ public class ThreadPoolWorkflowEngine im
workerMap = new HashMap();
- if (resUrl != null)
- rClient = new XmlRpcResourceManagerClient(resUrl);
+ if (resUrl != null) {
+ this.runner = new ResourceRunner(resUrl);
+ } else {
+ this.runner = new AsynchronousLocalEngineRunner();
+ }
this.conditionWait = Long.getLong(
"org.apache.oodt.cas.workflow.engine.preConditionWaitTime", 10)
.longValue();
+
+ this.condProcessor = new ConditionProcessor();
}
/*
@@ -137,7 +151,7 @@ public class ThreadPoolWorkflowEngine im
*/
public synchronized void pauseWorkflowInstance(String workflowInstId) {
// okay, try and look up that worker thread in our hash map
- SequentialProcessor worker = ((ThreadedProcessor) workerMap
+ SequentialProcessor worker = ((ThreadedExecutor) workerMap
.get(workflowInstId)).getProcessor();
if (worker == null) {
LOG.log(Level.WARNING,
@@ -161,7 +175,7 @@ public class ThreadPoolWorkflowEngine im
*/
public synchronized void resumeWorkflowInstance(String workflowInstId) {
// okay, try and look up that worker thread in our hash map
- SequentialProcessor worker = ((ThreadedProcessor) workerMap
+ SequentialProcessor worker = ((ThreadedExecutor) workerMap
.get(workflowInstId)).getProcessor();
if (worker == null) {
LOG.log(Level.WARNING,
@@ -173,7 +187,7 @@ public class ThreadPoolWorkflowEngine im
// also check to make sure that the worker is currently paused
// only can resume WorkflowInstances that are paused, right?
- if (!worker.isPaused()) {
+ if (true/*!worker.isPaused()*/) {
LOG.log(Level.WARNING,
"WorkflowEngine: Attempt to resume a workflow that "
+ "isn't paused currently: instance id: " + workflowInstId);
@@ -208,16 +222,15 @@ public class ThreadPoolWorkflowEngine im
wInst.setStatus(CREATED);
persistWorkflowInstance(wInst);
- SequentialProcessor worker = new SequentialProcessor(wInst,
- instRep, this.wmgrUrl, this.conditionWait);
- worker.setRClient(rClient);
+ SequentialProcessor worker = new SequentialProcessor(wInst, instRep,
+ this.wmgrUrl, this.conditionWait);
workerMap.put(wInst.getId(), worker);
wInst.setStatus(QUEUED);
persistWorkflowInstance(wInst);
try {
- pool.execute(new ThreadedProcessor(worker));
+ pool.execute(new ThreadedExecutor(worker, this.condProcessor));
} catch (InterruptedException e) {
throw new EngineException(e);
}
@@ -245,7 +258,7 @@ public class ThreadPoolWorkflowEngine im
*/
public synchronized boolean updateMetadata(String workflowInstId, Metadata met) {
// okay, try and look up that worker thread in our hash map
- SequentialProcessor worker = ((ThreadedProcessor) workerMap
+ SequentialProcessor worker = ((ThreadedExecutor) workerMap
.get(workflowInstId)).getProcessor();
if (worker == null) {
LOG.log(Level.WARNING,
@@ -290,7 +303,7 @@ public class ThreadPoolWorkflowEngine im
*/
public synchronized void stopWorkflow(String workflowInstId) {
// okay, try and look up that worker thread in our hash map
- SequentialProcessor worker = ((ThreadedProcessor) workerMap
+ SequentialProcessor worker = ((ThreadedExecutor) workerMap
.get(workflowInstId)).getProcessor();
if (worker == null) {
LOG.log(Level.WARNING,
@@ -324,7 +337,7 @@ public class ThreadPoolWorkflowEngine im
*/
public Metadata getWorkflowInstanceMetadata(String workflowInstId) {
// okay, try and look up that worker thread in our hash map
- SequentialProcessor worker = ((ThreadedProcessor) workerMap
+ SequentialProcessor worker = ((ThreadedExecutor) workerMap
.get(workflowInstId)).getProcessor();
if (worker == null) {
// try and get the metadata
@@ -477,12 +490,19 @@ public class ThreadPoolWorkflowEngine im
}
}
- class ThreadedProcessor implements Runnable {
+ class ThreadedExecutor implements Runnable, CoreMetKeys {
private SequentialProcessor processor;
- public ThreadedProcessor(SequentialProcessor processor) {
+ private boolean running;
+
+ private ConditionProcessor conditionEvaluator;
+
+ public ThreadedExecutor(SequentialProcessor processor,
+ ConditionProcessor conditionEvaluator) {
this.processor = processor;
+ this.running = false;
+ this.conditionEvaluator = conditionEvaluator;
}
/*
@@ -492,7 +512,90 @@ public class ThreadPoolWorkflowEngine im
*/
@Override
public void run() {
- processor.start();
+ String startDateTimeIsoStr = DateConvert.isoFormat(new Date());
+ this.getProcessor().getWorkflowInstance()
+ .setStartDateTimeIsoStr(startDateTimeIsoStr);
+ this.getProcessor().persistWorkflowInstance();
+
+ while (running && this.getProcessor().getRunnableSubProcessors() != null
+ && this.getProcessor().getRunnableSubProcessors().size() > 0) {
+ if (isPaused()) {
+ LOG.log(
+ Level.FINE,
+ "SequentialProcessor: Skipping execution: Paused: CurrentTask: "
+ + this.getProcessor().getTaskNameById(
+ this.getProcessor().getCurrentTaskId()));
+ continue;
+ }
+
+ TaskProcessor taskProcessor = (TaskProcessor) this.processor
+ .getRunnableSubProcessors().get(0);
+ WorkflowTask task = taskProcessor.getTask();
+ this.getProcessor().getWorkflowInstance()
+ .setCurrentTaskId(task.getTaskId());
+
+ this.getProcessor().persistWorkflowInstance();
+ if (!taskProcessor.checkTaskRequiredMetadata(this.getProcessor()
+ .getWorkflowInstance().getSharedContext())) {
+ this.getProcessor().getWorkflowInstance().setStatus(METADATA_MISSING);
+ this.getProcessor().persistWorkflowInstance();
+ return;
+ }
+
+ if (task.getConditions() != null) {
+ if (!this.conditionEvaluator.satisfied(task.getConditions(),
+ task.getTaskId(), this.getProcessor().getWorkflowInstance()
+ .getSharedContext())
+ && isRunning()) {
+
+ LOG.log(Level.FINEST,
+ "Pre-conditions for task: " + task.getTaskName()
+ + " unsatisfied");
+
+ if (!isPaused()) {
+ this.getProcessor().getWorkflowInstance()
+ .setStatus(WorkflowStatus.PAUSED);
+ }
+ continue;
+ }
+ }
+ LOG.log(Level.FINEST, "Executing task: " + task.getTaskName());
+
+ this.addStdWorkflowMetadata(getProcessor().getWorkflowInstance(), task,
+ getProcessor().getWorkflowInstance().getSharedContext(), wmgrUrl);
+
+ if (runner instanceof ResourceRunner) {
+ getProcessor().getWorkflowInstance().setStatus(RESMGR_SUBMIT);
+ //persistWorkflowInstance();
+ /*runner.execute(task, getProcessor().getWorkflowInstance()
+ .getSharedContext());*/
+ } else {
+ //this.workflowInstance.setStatus(STARTED);
+ //this.persistWorkflowInstance();
+ String currentTaskIsoStartDateTimeStr = DateConvert
+ .isoFormat(new Date());
+ //this.workflowInstance
+ //.setCurrentTaskStartDateTimeIsoStr(currentTaskIsoStartDateTimeStr);
+ //this.workflowInstance.setCurrentTaskEndDateTimeIsoStr(null);
+ /*runner.execute(task, getProcessor().getWorkflowInstance()
+ .getSharedContext());*/
+
+ String currentTaskIsoEndDateTimeStr = DateConvert
+ .isoFormat(new Date());
+ //this.workflowInstance
+ // .setCurrentTaskEndDateTimeIsoStr(currentTaskIsoEndDateTimeStr);
+ // this.persistWorkflowInstance();
+ }
+
+ LOG.log(Level.FINEST, "Completed task: " + task.getTaskName());
+
+ }
+
+ /*LOG.log(Level.FINEST, "Completed workflow: "
+ + this.workflowInstance.getWorkflow().getName());
+ if (isRunning()) {
+ stop();
+ }*/
}
/**
@@ -510,6 +613,62 @@ public class ThreadPoolWorkflowEngine im
this.processor = processor;
}
+ /**
+ * @return the running
+ */
+ public boolean isRunning() {
+ return running;
+ }
+
+ /**
+ * @param running
+ * the running to set
+ */
+ public void setRunning(boolean running) {
+ this.running = running;
+ }
+
+ /**
+ * @return the conditionEvaluator
+ */
+ public ConditionProcessor getConditionEvaluator() {
+ return conditionEvaluator;
+ }
+
+ /**
+ * @param conditionEvaluator
+ * the conditionEvaluator to set
+ */
+ public void setConditionEvaluator(ConditionProcessor conditionEvaluator) {
+ this.conditionEvaluator = conditionEvaluator;
+ }
+
+ public boolean isPaused() {
+ return this.getProcessor().getWorkflowInstance().getStatus()
+ .equals(WorkflowStatus.PAUSED);
+ }
+
+ protected void addStdWorkflowMetadata(WorkflowInstance wInst,
+ WorkflowTask task, Metadata ctx, URL wUrl) {
+ ctx.replaceMetadata(TASK_ID, task.getTaskId());
+ ctx.replaceMetadata(WORKFLOW_INST_ID, wInst.getId());
+ ctx.replaceMetadata(JOB_ID, wInst.getId());
+ ctx.replaceMetadata(PROCESSING_NODE, getHostname());
+ ctx.replaceMetadata(WORKFLOW_MANAGER_URL, wUrl.toString());
+ }
+
+ protected String getHostname() {
+ try {
+ // Get hostname by textual representation of IP address
+ InetAddress addr = InetAddress.getLocalHost();
+ // Get the host name
+ String hostname = addr.getHostName();
+ return hostname;
+ } catch (UnknownHostException e) {
+ }
+ return null;
+ }
+
}
}